From ce645d4fd3dfe8023cb58325352176aae845363d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Blin?= Date: Mon, 2 May 2022 08:49:47 -0400 Subject: [PATCH] conference: identify mixed videos per streamId Videos in the videoMixer_ were not easy to identify, moreover, "sinkId" in the participants informations was the concatenation of the confId and a URI, which is a problem if several devices from the same account was present in the conference. Finally, the active stream logic was dirty, with two different variables used to identify the active stream in the mixer. This patch introduces a streamId which is (callId_type_idx, e.g. ca111412_video_0, ca111412_audio_2), so every video shown in the conference are identified via a unique ID. Active stream in the video mixer is identified by this ID, not by the callId or pointer. This should not change any behaviour, but prepare for multistream. https://git.jami.net/savoirfairelinux/jami-project/-/issues/1429 Change-Id: I250dd31ad1ea92ed1fd1e94bec2f5abd311d2128 --- src/client/callmanager.cpp | 9 +- src/conference.cpp | 114 ++++++++------------------ src/conference.h | 5 -- src/conference_protocol.cpp | 4 +- src/conference_protocol.h | 7 +- src/media/audio/audio_rtp_session.cpp | 20 ++--- src/media/audio/audio_rtp_session.h | 2 +- src/media/audio/audio_sender.cpp | 6 +- src/media/audio/audio_sender.h | 4 +- src/media/rtp_session.h | 11 ++- src/media/video/video_mixer.cpp | 86 ++++++++++--------- src/media/video/video_mixer.h | 51 ++++++++---- src/media/video/video_rtp_session.cpp | 63 ++++++++------ src/media/video/video_rtp_session.h | 2 +- src/sip/sip_utils.cpp | 8 ++ src/sip/sip_utils.h | 3 + src/sip/sipcall.cpp | 47 ++++++++--- src/sip/sipcall.h | 1 + 18 files changed, 222 insertions(+), 221 deletions(-) diff --git a/src/client/callmanager.cpp b/src/client/callmanager.cpp index 9735501e726..11a65f1d324 100644 --- a/src/client/callmanager.cpp +++ b/src/client/callmanager.cpp @@ -578,14 +578,7 @@ setActiveStream(const std::string& accountId, { if (const auto account = jami::Manager::instance().getAccount(accountId)) { if (auto conf = account->getConference(confId)) { - // TODO, as for now the videoMixer doesn't have the streamId - if (deviceId == account->currentDeviceId() && accountUri == account->getUsername()) { - conf->setActiveStream("", state); - } else if (auto call = std::static_pointer_cast( - conf->getCallFromPeerID(accountUri))) { - conf->setActiveStream(call->getCallId(), state); - } - // conf->setActiveStream(streamId, state); + conf->setActiveStream(streamId, state); } else if (auto call = account->getCall(confId)) { if (call->conferenceProtocolVersion() == 1) { Json::Value sinkVal; diff --git a/src/conference.cpp b/src/conference.cpp index 71f077aa1c1..7583bcf3cf8 100644 --- a/src/conference.cpp +++ b/src/conference.cpp @@ -109,14 +109,14 @@ Conference::Conference(const std::shared_ptr& account) } auto hostAdded = false; // Handle participants showing their video - std::unique_lock lk(shared->videoToCallMtx_); for (const auto& info : infos) { std::string uri {}; bool isLocalMuted = false; std::string deviceId {}; auto active = false; - if (!info.id.empty()) { - if (auto call = std::dynamic_pointer_cast(getCall(info.id))) { + if (!info.callId.empty()) { + std::string callId = info.callId; + if (auto call = std::dynamic_pointer_cast(getCall(callId))) { uri = call->getPeerNumber(); isLocalMuted = call->isPeerMuted(); if (auto* transport = call->getTransport()) @@ -125,13 +125,12 @@ Conference::Conference(const std::shared_ptr& account) std::string_view peerId = string_remove_suffix(uri, '@'); auto isModerator = shared->isModerator(peerId); auto isHandRaised = shared->isHandRaised(deviceId); - auto isModeratorMuted = shared->isMuted(info.id); - auto sinkId = shared->getConfId() + peerId; + auto isModeratorMuted = shared->isMuted(callId); if (auto videoMixer = shared->videoMixer_) - active = videoMixer->verifyActive(info.id); // TODO streamId + active = videoMixer->verifyActive(info.streamId); newInfo.emplace_back(ParticipantInfo {std::move(uri), deviceId, - std::move(sinkId), + std::move(info.streamId), active, info.x, info.y, @@ -143,30 +142,31 @@ Conference::Conference(const std::shared_ptr& account) isModerator, isHandRaised}); } else { - auto it = shared->videoToCall_.find(info.source); - if (it == shared->videoToCall_.end()) - it = shared->videoToCall_.emplace_hint(it, info.source, std::string()); - // If not local auto isModeratorMuted = false; - if (!it->second.empty()) { + // If not local + auto streamInfo = shared->videoMixer_->streamInfo(info.source); + std::string streamId = streamInfo.streamId; + if (!streamId.empty()) { // Retrieve calls participants // TODO: this is a first version, we assume that the peer is not // a master of a conference and there is only one remote // In the future, we should retrieve confInfo from the call // To merge layouts informations - // TODO sinkId - auto isModeratorMuted = shared->isMuted(it->second); + isModeratorMuted = shared->isMuted(streamId); if (auto videoMixer = shared->videoMixer_) - active = videoMixer->verifyActive(it->second); - if (auto call = std::dynamic_pointer_cast(getCall(it->second))) { + active = videoMixer->verifyActive(streamId); + if (auto call = std::dynamic_pointer_cast(getCall(streamInfo.callId))) { uri = call->getPeerNumber(); isLocalMuted = call->isPeerMuted(); if (auto* transport = call->getTransport()) deviceId = transport->deviceId(); } + } else { + streamId = sip_utils::streamId("", 0, MediaType::MEDIA_VIDEO); + if (auto videoMixer = shared->videoMixer_) + active = videoMixer->verifyActive(streamId); } std::string_view peerId = string_remove_suffix(uri, '@'); - // TODO (another patch): use deviceId instead of peerId as specified in protocol auto isModerator = shared->isModerator(peerId); if (uri.empty() && !hostAdded) { hostAdded = true; @@ -175,12 +175,9 @@ Conference::Conference(const std::shared_ptr& account) isLocalMuted = shared->isMediaSourceMuted(MediaType::MEDIA_AUDIO); } auto isHandRaised = shared->isHandRaised(deviceId); - auto sinkId = shared->getConfId() + peerId; - if (auto videoMixer = shared->videoMixer_) - active |= videoMixer->verifyActive(info.source); newInfo.emplace_back(ParticipantInfo {std::move(uri), deviceId, - std::move(sinkId), + std::move(streamId), active, info.x, info.y, @@ -197,7 +194,6 @@ Conference::Conference(const std::shared_ptr& account) newInfo.h = videoMixer->getHeight(); newInfo.w = videoMixer->getWidth(); } - lk.unlock(); if (!hostAdded) { ParticipantInfo pi; pi.videoMuted = true; @@ -218,10 +214,8 @@ Conference::Conference(const std::shared_ptr& account) }); parser_.onRaiseHand( [&](const auto& deviceId, bool state) { setHandRaised(deviceId, state); }); - parser_.onSetActiveStream([&](const auto& accountUri, const auto& deviceId, bool state) { - // TODO replace per streamId - if (auto call = getCallWith(accountUri, deviceId)) - setHandRaised(call->getCallId(), state); + parser_.onSetActiveStream([&](const auto& streamId, bool state) { + setActiveStream(streamId, state); }); parser_.onMuteStreamAudio ( @@ -616,11 +610,12 @@ Conference::handleMediaChangeRequest(const std::shared_ptr& call, #ifdef ENABLE_VIDEO // If the new media list has video, remove the participant from audioonlylist. - if (MediaAttribute::hasMediaType(MediaAttribute::buildMediaAttributesList(remoteMediaList, - false), - MediaType::MEDIA_VIDEO)) { - if (videoMixer_) - videoMixer_->removeAudioOnlySource(call->getCallId()); + if (videoMixer_ && MediaAttribute::hasMediaType( + MediaAttribute::buildMediaAttributesList(remoteMediaList, false), + MediaType::MEDIA_VIDEO)) { + auto callId = call->getCallId(); + videoMixer_->removeAudioOnlySource(callId, + std::string(sip_utils::streamId(callId, 0, MediaType::MEDIA_VIDEO))); } #endif @@ -708,10 +703,8 @@ Conference::addParticipant(const std::string& participant_id) // In conference, if a participant joins with an audio only // call, it must be listed in the audioonlylist. auto mediaList = call->getMediaAttributeList(); - if (not MediaAttribute::hasMediaType(mediaList, MediaType::MEDIA_VIDEO)) { - if (videoMixer_) { - videoMixer_->addAudioOnlySource(call->getCallId()); - } + if (videoMixer_ && not MediaAttribute::hasMediaType(mediaList, MediaType::MEDIA_VIDEO)) { + videoMixer_->addAudioOnlySource(call->getCallId(), sip_utils::streamId(call->getCallId(), 0, MediaType::MEDIA_AUDIO)); } call->enterConference(shared_from_this()); // Continue the recording for the conference if one participant was recording @@ -739,14 +732,11 @@ Conference::setActiveParticipant(const std::string& participant_id) if (!videoMixer_) return; if (isHost(participant_id)) { - videoMixer_->addActiveHost(); + videoMixer_->setActiveStream(sip_utils::streamId("", 0, MediaType::MEDIA_VIDEO)); return; } if (auto call = getCallFromPeerID(participant_id)) { - if (auto videoRecv = call->getReceiveVideoFrameActiveWriter()) - videoMixer_->setActiveStream(videoRecv.get()); - else - videoMixer_->setActiveStream(call->getCallId()); + videoMixer_->setActiveStream(sip_utils::streamId(call->getCallId(), 0, MediaType::MEDIA_VIDEO)); return; } @@ -764,25 +754,13 @@ Conference::setActiveParticipant(const std::string& participant_id) void Conference::setActiveStream(const std::string& streamId, bool state) { - // TODO BUG: for now activeStream is the callId, and should be the sink! #ifdef ENABLE_VIDEO if (!videoMixer_) return; - if (state) { - // TODO remove - if (streamId.empty()) { - videoMixer_->addActiveHost(); - } else if (auto call = getCall(streamId)) { - if (auto videoRecv = call->getReceiveVideoFrameActiveWriter()) - videoMixer_->setActiveStream(videoRecv.get()); - else - videoMixer_->setActiveStream(call->getCallId()); - return; - } - // TODO videoMixer_->setActiveStream(sinkId); - } else { + if (state) + videoMixer_->setActiveStream(streamId); + else videoMixer_->resetActiveStream(); - } #endif } @@ -872,27 +850,6 @@ Conference::createSinks(const ConfInfo& infos) sink), confSinksMap_); } - -void -Conference::attachVideo(Observable>* frame, const std::string& callId) -{ - JAMI_DBG("[conf:%s] attaching video of call %s", id_.c_str(), callId.c_str()); - std::lock_guard lk(videoToCallMtx_); - videoToCall_.emplace(frame, callId); - frame->attach(videoMixer_.get()); -} - -void -Conference::detachVideo(Observable>* frame) -{ - std::lock_guard lk(videoToCallMtx_); - auto it = videoToCall_.find(frame); - if (it != videoToCall_.end()) { - JAMI_DBG("[conf:%s] detaching video of call %s", id_.c_str(), it->second.c_str()); - it->first->detach(videoMixer_.get()); - videoToCall_.erase(it); - } -} #endif void @@ -911,8 +868,8 @@ Conference::removeParticipant(const std::string& participant_id) #ifdef ENABLE_VIDEO auto sinkId = getConfId() + peerId; // Remove if active - // TODO if (videoMixer_->verifyActive(sinkId)) - if (videoMixer_->verifyActive(participant_id)) + // TODO all streams + if (videoMixer_->verifyActive(sip_utils::streamId(participant_id, 0, MediaType::MEDIA_VIDEO))) videoMixer_->resetActiveStream(); call->exitConference(); if (call->isPeerRecording()) @@ -1252,7 +1209,6 @@ Conference::setHandRaised(const std::string& deviceId, const bool& state) if (auto* transport = call->getTransport()) callDeviceId = transport->deviceId(); if (deviceId == callDeviceId) { - JAMI_ERR() << "@@@X"; if (state and not isPeerRequiringAttention) { JAMI_DBG("Raise %s hand", deviceId.c_str()); handsRaised_.emplace(deviceId); diff --git a/src/conference.h b/src/conference.h index a992ab40d37..683a965bade 100644 --- a/src/conference.h +++ b/src/conference.h @@ -335,8 +335,6 @@ class Conference : public Recordable, public std::enable_shared_from_this>* frame, const std::string& callId); - void detachVideo(Observable>* frame); std::shared_ptr getVideoMixer(); std::string getVideoInput() const { return hostVideoSource_.sourceUri_; } #endif @@ -400,9 +398,6 @@ class Conference : public Recordable, public std::enable_shared_from_this>*, std::string> videoToCall_ {}; std::shared_ptr ghostRingBuffer_; #ifdef ENABLE_VIDEO diff --git a/src/conference_protocol.cpp b/src/conference_protocol.cpp index 93a28b3a414..78d5d6de482 100644 --- a/src/conference_protocol.cpp +++ b/src/conference_protocol.cpp @@ -158,9 +158,7 @@ ConfProtocolParser::parseV1() mediaVal[ProtocolKeys::MUTEAUDIO].asBool()); } if (mediaVal.isMember(ProtocolKeys::ACTIVE)) { - // TODO streamId - setActiveStream_(accountUri, - deviceId, + setActiveStream_(streamId, mediaVal[ProtocolKeys::ACTIVE].asBool()); } } diff --git a/src/conference_protocol.h b/src/conference_protocol.h index cee480826f3..f43e13074ff 100644 --- a/src/conference_protocol.h +++ b/src/conference_protocol.h @@ -59,10 +59,7 @@ class ConfProtocolParser { { raiseHand_ = std::move(cb); } - /** - * @todo, replace the 2 strings per streamId - */ - void onSetActiveStream(std::function&& cb) + void onSetActiveStream(std::function&& cb) { setActiveStream_ = std::move(cb); } @@ -125,7 +122,7 @@ class ConfProtocolParser { std::function checkAuthorization_; std::function hangupParticipant_; std::function raiseHand_; - std::function setActiveStream_; + std::function setActiveStream_; std::function muteStreamAudio_; std::function diff --git a/src/media/audio/audio_rtp_session.cpp b/src/media/audio/audio_rtp_session.cpp index 49bca44e7f6..928117d117b 100644 --- a/src/media/audio/audio_rtp_session.cpp +++ b/src/media/audio/audio_rtp_session.cpp @@ -48,21 +48,21 @@ namespace jami { -AudioRtpSession::AudioRtpSession(const std::string& id) - : RtpSession(id, MediaType::MEDIA_AUDIO) +AudioRtpSession::AudioRtpSession(const std::string& callId, const std::string& streamId) + : RtpSession(callId, streamId, MediaType::MEDIA_AUDIO) , rtcpCheckerThread_([] { return true; }, [this] { processRtcpChecker(); }, [] {}) { - JAMI_DBG("Created Audio RTP session: %p - call Id %s", this, callID_.c_str()); + JAMI_DBG("Created Audio RTP session: %p - call Id %s", this, callId_.c_str()); // don't move this into the initializer list or Cthulus will emerge - ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(callID_); + ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(callId_); } AudioRtpSession::~AudioRtpSession() { stop(); - JAMI_DBG("Destroyed Audio RTP session: %p - call Id %s", this, callID_.c_str()); + JAMI_DBG("Destroyed Audio RTP session: %p - call Id %s", this, callId_.c_str()); } void @@ -90,7 +90,7 @@ AudioRtpSession::startSender() audioInput_->detach(sender_.get()); // sender sets up input correctly, we just keep a reference in case startSender is called - audioInput_ = jami::getAudioInput(callID_); + audioInput_ = jami::getAudioInput(callId_); audioInput_->setMuted(muteState_); audioInput_->setSuccessfulSetupCb(onSuccessfulSetup_); auto newParams = audioInput_->switchInput(input_); @@ -117,7 +117,7 @@ AudioRtpSession::startSender() sender_.reset(); socketPair_->stopSendOp(false); sender_.reset( - new AudioSender(callID_, getRemoteRtpUri(), send_, *socketPair_, initSeqVal_, mtu_)); + new AudioSender(getRemoteRtpUri(), send_, *socketPair_, initSeqVal_, mtu_)); } catch (const MediaEncoderException& e) { JAMI_ERR("%s", e.what()); send_.enabled = false; @@ -160,7 +160,7 @@ AudioRtpSession::startReceiver() JAMI_WARN("Restarting audio receiver"); auto accountAudioCodec = std::static_pointer_cast(receive_.codec); - receiveThread_.reset(new AudioReceiveThread(callID_, + receiveThread_.reset(new AudioReceiveThread(callId_, accountAudioCodec->audioformat, receive_.receiving_sdp, mtu_)); @@ -332,7 +332,7 @@ AudioRtpSession::initRecorder(std::shared_ptr& rec) { if (receiveThread_) receiveThread_->attach(rec->addStream(receiveThread_->getInfo())); - if (auto input = jami::getAudioInput(callID_)) + if (auto input = jami::getAudioInput(callId_)) input->attach(rec->addStream(input->getInfo())); } @@ -344,7 +344,7 @@ AudioRtpSession::deinitRecorder(std::shared_ptr& rec) receiveThread_->detach(ob); } } - if (auto input = jami::getAudioInput(callID_)) { + if (auto input = jami::getAudioInput(callId_)) { if (auto ob = rec->getStream(input->getInfo().name)) { input->detach(ob); } diff --git a/src/media/audio/audio_rtp_session.h b/src/media/audio/audio_rtp_session.h index fb04899c1bf..9385b26021b 100644 --- a/src/media/audio/audio_rtp_session.h +++ b/src/media/audio/audio_rtp_session.h @@ -50,7 +50,7 @@ struct RTCPInfo class AudioRtpSession : public RtpSession { public: - AudioRtpSession(const std::string& id); + AudioRtpSession(const std::string& callId, const std::string& streamId); virtual ~AudioRtpSession(); void start(std::unique_ptr rtp_sock, std::unique_ptr rtcp_sock) override; diff --git a/src/media/audio/audio_sender.cpp b/src/media/audio/audio_sender.cpp index a67e3863be1..2f1122b4328 100644 --- a/src/media/audio/audio_sender.cpp +++ b/src/media/audio/audio_sender.cpp @@ -33,14 +33,12 @@ namespace jami { -AudioSender::AudioSender(const std::string& id, - const std::string& dest, +AudioSender::AudioSender(const std::string& dest, const MediaDescription& args, SocketPair& socketPair, const uint16_t seqVal, const uint16_t mtu) - : id_(id) - , dest_(dest) + : dest_(dest) , args_(args) , seqVal_(seqVal) , mtu_(mtu) diff --git a/src/media/audio/audio_sender.h b/src/media/audio/audio_sender.h index d48c53dccce..6dc58e3f6f7 100644 --- a/src/media/audio/audio_sender.h +++ b/src/media/audio/audio_sender.h @@ -37,8 +37,7 @@ class Resampler; class AudioSender : public Observer> { public: - AudioSender(const std::string& id, - const std::string& dest, + AudioSender(const std::string& dest, const MediaDescription& args, SocketPair& socketPair, const uint16_t seqVal, @@ -56,7 +55,6 @@ class AudioSender : public Observer> bool setup(SocketPair& socketPair); - std::string id_; std::string dest_; MediaDescription args_; std::unique_ptr audioEncoder_; diff --git a/src/media/rtp_session.h b/src/media/rtp_session.h index 715b8841b3d..453befe8cdd 100644 --- a/src/media/rtp_session.h +++ b/src/media/rtp_session.h @@ -42,8 +42,10 @@ class RtpSession // Media direction enum class Direction { SEND, RECV }; - RtpSession(const std::string& callID, MediaType type) - : callID_(callID) + // Note: callId is used for ring buffers and smarttools + RtpSession(const std::string& callId, const std::string& streamId, MediaType type) + : callId_(callId) + , streamId_(streamId) , mediaType_(type) {} virtual ~RtpSession() {}; @@ -82,9 +84,12 @@ class RtpSession const IpAddr& getSendAddr() const { return send_.addr; }; const IpAddr& getRecvAddr() const { return receive_.addr; }; + inline std::string streamId() const { return streamId_; } + protected: std::recursive_mutex mutex_; - const std::string callID_; + const std::string callId_; + const std::string streamId_; MediaType mediaType_; std::unique_ptr socketPair_; std::string input_ {}; diff --git a/src/media/video/video_mixer.cpp b/src/media/video/video_mixer.cpp index d0f2515d91d..8ece82fbbd1 100644 --- a/src/media/video/video_mixer.cpp +++ b/src/media/video/video_mixer.cpp @@ -32,6 +32,7 @@ #ifdef RING_ACCEL #include "accel.h" #endif +#include "sip/sip_utils.h" #include #include @@ -88,8 +89,7 @@ VideoMixer::VideoMixer(const std::string& id, const std::string& localInput) // Local video camera is the main participant if (not localInput.empty()) videoLocal_ = getVideoInput(localInput); - if (videoLocal_) - videoLocal_->attach(this); + attachVideo(videoLocal_.get(), "", sip_utils::streamId("", 0, MediaType::MEDIA_VIDEO)); loop_.start(); nextProcess_ = std::chrono::steady_clock::now(); @@ -100,16 +100,10 @@ VideoMixer::~VideoMixer() { stop_sink(); - if (videoLocal_) { - videoLocal_->detach(this); - // prefer to release it now than after the next join - videoLocal_.reset(); - } - if (videoLocalSecondary_) { - videoLocalSecondary_->detach(this); - // prefer to release it now than after the next join - videoLocalSecondary_.reset(); - } + detachVideo(videoLocal_.get()); + videoLocal_.reset(); + detachVideo(videoLocalSecondary_.get()); + videoLocalSecondary_.reset(); loop_.join(); @@ -139,8 +133,7 @@ VideoMixer::switchInput(const std::string& input) // Re-attach videoInput to mixer videoLocal_ = getVideoInput(input); - if (videoLocal_) - videoLocal_->attach(this); + attachVideo(videoLocal_.get(), "", sip_utils::streamId("", 0, MediaType::MEDIA_VIDEO)); } void @@ -164,9 +157,7 @@ VideoMixer::switchSecondaryInput(const std::string& input) // Re-attach videoInput to mixer videoLocalSecondary_ = getVideoInput(input); - if (videoLocalSecondary_) { - videoLocalSecondary_->attach(this); - } + attachVideo(videoLocalSecondary_.get(), "", sip_utils::streamId("", 1, MediaType::MEDIA_VIDEO)); } void @@ -178,35 +169,50 @@ VideoMixer::stopInput() } void -VideoMixer::addActiveHost() +VideoMixer::setActiveStream(const std::string& id) { - activeStream_ = ""; - activeSource_ = videoLocalSecondary_ ? videoLocalSecondary_.get() : videoLocal_.get(); + activeStream_ = id; updateLayout(); } void -VideoMixer::setActiveStream(Observable>* ob) +VideoMixer::updateLayout() { - activeStream_ = ""; - activeSource_ = ob; - updateLayout(); + if (activeStream_ == "") + currentLayout_ = Layout::GRID; + layoutUpdated_ += 1; } void -VideoMixer::setActiveStream(const std::string& id) +VideoMixer::attachVideo(Observable>* frame, const std::string& callId, const std::string& streamId) { - activeSource_ = nullptr; - activeStream_ = id; - updateLayout(); + if (!frame) return; + JAMI_DBG("Attaching video with streamId %s", streamId.c_str()); + std::lock_guard lk(videoToStreamInfoMtx_); + videoToStreamInfo_.emplace(frame, StreamInfo {callId, streamId}); + frame->attach(this); } void -VideoMixer::updateLayout() +VideoMixer::detachVideo(Observable>* frame) { - if (activeStream_ == "" && activeSource_ == nullptr) - currentLayout_ = Layout::GRID; - layoutUpdated_ += 1; + if (!frame) + return; + bool detach = false; + std::unique_lock lk(videoToStreamInfoMtx_); + auto it = videoToStreamInfo_.find(frame); + if (it != videoToStreamInfo_.end()) { + JAMI_DBG("Detaching video of call %s", it->second.callId.c_str()); + detach = true; + // Handle the case where the current shown source leave the conference + // Note, do not call resetActiveStream() to avoid multiple updates + if (verifyActive(it->second.streamId)) + activeStream_ = {}; + videoToStreamInfo_.erase(it); + } + lk.unlock(); + if (detach) + frame->detach(this); } void @@ -230,9 +236,6 @@ VideoMixer::detached(Observable>* ob) for (const auto& x : sources_) { if (x->source == ob) { - // Handle the case where the current shown source leave the conference - if (verifyActive(ob)) - resetActiveStream(); JAMI_DBG("Remove source [%p]", x.get()); sources_.remove(x); JAMI_DBG("Total sources: %lu", sources_.size()); @@ -303,11 +306,10 @@ VideoMixer::process() std::vector sourcesInfo; sourcesInfo.reserve(sources_.size() + audioOnlySources_.size()); // add all audioonlysources - for (auto& id : audioOnlySources_) { - JAMI_ERR() << "@@@ " << id; - auto active = verifyActive(id); + for (auto& [callId, streamId] : audioOnlySources_) { + auto active = verifyActive(streamId); if (currentLayout_ != Layout::ONE_BIG or active) { - sourcesInfo.emplace_back(SourceInfo {{}, 0, 0, 10, 10, false, id}); + sourcesInfo.emplace_back(SourceInfo {{}, 0, 0, 10, 10, false, callId, streamId}); } if (currentLayout_ == Layout::ONE_BIG and active) successfullyRendered = true; @@ -318,7 +320,8 @@ VideoMixer::process() if (!loop_.isRunning()) return; - auto activeSource = verifyActive(x->source); + auto sinfo = streamInfo(x->source); + auto activeSource = verifyActive(sinfo.streamId); if (currentLayout_ != Layout::ONE_BIG or activeSource) { // make rendered frame temporarily unavailable for update() // to avoid concurrent access. @@ -385,8 +388,9 @@ VideoMixer::process() layoutUpdated_ -= 1; if (layoutUpdated_ == 0) { for (auto& x : sources_) { + auto sinfo = streamInfo(x->source); sourcesInfo.emplace_back( - SourceInfo {x->source, x->x, x->y, x->w, x->h, x->hasVideo, {}}); + SourceInfo {x->source, x->x, x->y, x->w, x->h, x->hasVideo, sinfo.callId, sinfo.streamId}); } if (onSourcesUpdated_) onSourcesUpdated_(std::move(sourcesInfo)); diff --git a/src/media/video/video_mixer.h b/src/media/video/video_mixer.h index bbd982ddd40..6c014ea8c2f 100644 --- a/src/media/video/video_mixer.h +++ b/src/media/video/video_mixer.h @@ -37,6 +37,11 @@ namespace video { class SinkClient; +struct StreamInfo { + std::string callId; + std::string streamId; +}; + struct SourceInfo { Observable>* source; @@ -45,7 +50,8 @@ struct SourceInfo int w; int h; bool hasVideo; - std::string id; + std::string callId; + std::string streamId; }; using OnSourcesUpdatedCb = std::function&&)>; @@ -73,27 +79,18 @@ class VideoMixer : public VideoGenerator, public VideoFramePassiveReader void switchSecondaryInput(const std::string& input); void stopInput(); - void setActiveStream(Observable>* ob); void setActiveStream(const std::string& id); void resetActiveStream() { activeStream_ = {}; - activeSource_ = {}; updateLayout(); } - void addActiveHost(); - // TODO group, we can only use a set of string to identify actives bool verifyActive(const std::string& id) { return activeStream_ == id; } - bool verifyActive(Observable>* ob) - { - return activeSource_ == ob; - } - void setVideoLayout(Layout newLayout) { currentLayout_ = newLayout; @@ -114,18 +111,33 @@ class VideoMixer : public VideoGenerator, public VideoFramePassiveReader std::shared_ptr& getSink() { return sink_; } - void addAudioOnlySource(const std::string& id) + void addAudioOnlySource(const std::string& callId, const std::string& streamId) { - std::lock_guard lk(audioOnlySourcesMtx_); - audioOnlySources_.insert(id); + std::unique_lock lk(audioOnlySourcesMtx_); + audioOnlySources_.insert({callId, streamId}); + lk.unlock(); updateLayout(); } - void removeAudioOnlySource(const std::string& id) + void removeAudioOnlySource(const std::string& callId, const std::string& streamId) { - std::lock_guard lk(audioOnlySourcesMtx_); - if (audioOnlySources_.erase(id)) + std::unique_lock lk(audioOnlySourcesMtx_); + if (audioOnlySources_.erase({callId, streamId})) { + lk.unlock(); updateLayout(); + } + } + + void attachVideo(Observable>* frame, const std::string& callId, const std::string& streamId); + void detachVideo(Observable>* frame); + + StreamInfo streamInfo(Observable>* frame) const + { + std::lock_guard lk(videoToStreamInfoMtx_); + auto it = videoToStreamInfo_.find(frame); + if (it == videoToStreamInfo_.end()) + return {}; + return it->second; } private: @@ -162,11 +174,14 @@ class VideoMixer : public VideoGenerator, public VideoFramePassiveReader ThreadLoop loop_; // as to be last member Layout currentLayout_ {Layout::GRID}; - Observable>* activeSource_ {nullptr}; std::list> sources_; + // We need to convert call to frame + mutable std::mutex videoToStreamInfoMtx_ {}; + std::map>*, StreamInfo> videoToStreamInfo_ {}; + std::mutex audioOnlySourcesMtx_; - std::set audioOnlySources_; + std::set> audioOnlySources_; std::string activeStream_ {}; std::atomic_int layoutUpdated_ {0}; diff --git a/src/media/video/video_rtp_session.cpp b/src/media/video/video_rtp_session.cpp index df09ddbecac..b91f5198799 100644 --- a/src/media/video/video_rtp_session.cpp +++ b/src/media/video/video_rtp_session.cpp @@ -59,15 +59,17 @@ constexpr auto EXPIRY_TIME_RTCP = std::chrono::seconds(2); constexpr auto DELAY_AFTER_REMB_INC = std::chrono::seconds(1); constexpr auto DELAY_AFTER_REMB_DEC = std::chrono::milliseconds(500); -VideoRtpSession::VideoRtpSession(const string& callID, const DeviceParams& localVideoParams) - : RtpSession(callID, MediaType::MEDIA_VIDEO) +VideoRtpSession::VideoRtpSession(const string& callId, + const string& streamId, + const DeviceParams& localVideoParams) + : RtpSession(callId, streamId, MediaType::MEDIA_VIDEO) , localVideoParams_(localVideoParams) , videoBitrateInfo_ {} , rtcpCheckerThread_([] { return true; }, [this] { processRtcpChecker(); }, [] {}) { setupVideoBitrateInfo(); // reset bitrate cc = std::make_unique(); - JAMI_DBG("[%p] Video RTP session created for call %s", this, callID_.c_str()); + JAMI_DBG("[%p] Video RTP session created for call %s", this, callId_.c_str()); } VideoRtpSession::~VideoRtpSession() @@ -247,7 +249,7 @@ VideoRtpSession::startReceiver() if (receiveThread_) JAMI_WARN("[%p] Already has a receiver, restarting", this); receiveThread_.reset( - new VideoReceiveThread(callID_, !conference_, receive_.receiving_sdp, mtu_)); + new VideoReceiveThread(callId_, !conference_, receive_.receiving_sdp, mtu_)); // XXX keyframe requests can timeout if unanswered receiveThread_->addIOContext(*socketPair_); @@ -255,23 +257,27 @@ VideoRtpSession::startReceiver() receiveThread_->startLoop(); receiveThread_->setRequestKeyFrameCallback([this]() { cbKeyFrameRequest_(); }); receiveThread_->setRotation(rotation_.load()); - if (videoMixer_) { - auto activeParticipant = videoMixer_->verifyActive(receiveThread_.get()) - || videoMixer_->verifyActive(callID_); - videoMixer_->removeAudioOnlySource(callID_); - if (activeParticipant) - videoMixer_->setActiveStream(receiveThread_.get()); + if (videoMixer_ and conference_) { + // Note, this should be managed differently, this is a bit hacky + auto audioId = streamId_; + string_replace(audioId, "video", "audio"); + auto activeStream = videoMixer_->verifyActive(audioId); + videoMixer_->removeAudioOnlySource(callId_, audioId); + if (activeStream) + videoMixer_->setActiveStream(streamId_); } } else { JAMI_DBG("[%p] Video receiver disabled", this); - if (receiveThread_ and videoMixer_) { - auto activeParticipant = videoMixer_->verifyActive(receiveThread_.get()) - || videoMixer_->verifyActive(callID_); - videoMixer_->addAudioOnlySource(callID_); + if (receiveThread_ and videoMixer_ and conference_) { + // Note, this should be managed differently, this is a bit hacky + auto audioId_ = streamId_; + string_replace(audioId_, "video", "audio"); + auto activeStream = videoMixer_->verifyActive(streamId_); + videoMixer_->addAudioOnlySource(callId_, audioId_); receiveThread_->detach(videoMixer_.get()); - if (activeParticipant) - videoMixer_->setActiveStream(callID_); + if (activeStream) + videoMixer_->setActiveStream(audioId_); } } if (socketPair_) @@ -289,11 +295,13 @@ VideoRtpSession::stopReceiver() return; if (videoMixer_) { - auto activeParticipant = videoMixer_->verifyActive(receiveThread_.get()) || videoMixer_->verifyActive(callID_); - videoMixer_->addAudioOnlySource(callID_); + auto activeStream = videoMixer_->verifyActive(streamId_); + auto audioId = streamId_; + string_replace(audioId, "video", "audio"); + videoMixer_->addAudioOnlySource(callId_, audioId); receiveThread_->detach(videoMixer_.get()); - if (activeParticipant) - videoMixer_->setActiveStream(callID_); + if (activeStream) + videoMixer_->setActiveStream(audioId); } // We need to disable the read operation, otherwise the @@ -463,7 +471,7 @@ VideoRtpSession::setupConferenceVideoPipeline(Conference& conference, Direction JAMI_DBG("[%p] Setup video sender pipeline on conference %s for call %s", this, conference.getConfId().c_str(), - callID_.c_str()); + callId_.c_str()); videoMixer_ = conference.getVideoMixer(); if (sender_) { // Swap sender from local video to conference video mixer @@ -478,10 +486,11 @@ VideoRtpSession::setupConferenceVideoPipeline(Conference& conference, Direction JAMI_DBG("[%p] Setup video receiver pipeline on conference %s for call %s", this, conference.getConfId().c_str(), - callID_.c_str()); + callId_.c_str()); if (receiveThread_) { receiveThread_->stopSink(); - conference.attachVideo(receiveThread_.get(), callID_); + if (videoMixer_) + videoMixer_->attachVideo(receiveThread_.get(), callId_, streamId_); } else { JAMI_WARN("[%p] no receiver", this); } @@ -545,11 +554,11 @@ VideoRtpSession::exitConference() videoMixer_->detach(sender_.get()); if (receiveThread_) { - auto activetParticipant = videoMixer_->verifyActive(receiveThread_.get()); - conference_->detachVideo(receiveThread_.get()); + auto activeStream = videoMixer_->verifyActive(streamId_); + videoMixer_->detachVideo(receiveThread_.get()); receiveThread_->startSink(); - if (activetParticipant) - videoMixer_->setActiveStream(callID_); + if (activeStream) + videoMixer_->setActiveStream(streamId_); } videoMixer_.reset(); diff --git a/src/media/video/video_rtp_session.h b/src/media/video/video_rtp_session.h index 16277f51aa1..afa05f08910 100644 --- a/src/media/video/video_rtp_session.h +++ b/src/media/video/video_rtp_session.h @@ -70,7 +70,7 @@ class VideoRtpSession : public RtpSession public: using BaseType = RtpSession; - VideoRtpSession(const std::string& callID, const DeviceParams& localVideoParams); + VideoRtpSession(const std::string& callId, const std::string& streamId, const DeviceParams& localVideoParams); ~VideoRtpSession(); void setRequestKeyFrameCallback(std::function cb); diff --git a/src/sip/sip_utils.cpp b/src/sip/sip_utils.cpp index 31d295a4f6c..0787db5243f 100644 --- a/src/sip/sip_utils.cpp +++ b/src/sip/sip_utils.cpp @@ -293,6 +293,14 @@ sip_strerror(pj_status_t code) return std::string {ret.ptr, ret.ptr + ret.slen}; } +std::string +streamId(const std::string& callId, uint32_t idx, MediaType mt) +{ + if (callId.empty()) + return fmt::format("host_{}_{}", (mt == MediaType::MEDIA_VIDEO ? "video" : "audio"), idx); + return fmt::format("{}_{}_{}", callId, (mt == MediaType::MEDIA_VIDEO ? "video" : "audio"), idx); +} + void sockaddr_to_host_port(pj_pool_t* pool, pjsip_host_port* host_port, const pj_sockaddr* addr) { diff --git a/src/sip/sip_utils.h b/src/sip/sip_utils.h index e4c1911f203..974e3a76442 100644 --- a/src/sip/sip_utils.h +++ b/src/sip/sip_utils.h @@ -143,6 +143,9 @@ as_view(const pj_str_t& str) noexcept return {str.ptr, (size_t) str.slen}; } +std::string +streamId(const std::string& callId, uint32_t idx, MediaType mt); + // PJSIP dialog locking in RAII way // Usage: declare local variable like this: sip_utils::PJDialogLock lock {dialog}; // The lock is kept until the local variable is deleted diff --git a/src/sip/sipcall.cpp b/src/sip/sipcall.cpp index aeb7ded93c0..aebbc9f98b2 100644 --- a/src/sip/sipcall.cpp +++ b/src/sip/sipcall.cpp @@ -175,12 +175,28 @@ SIPCall::createRtpSession(RtpStream& stream) if (not stream.mediaAttribute_) throw std::runtime_error("Missing media attribute"); + // Find idx of this stream as we can share several audio/videos + auto streamIdx = [&]() { + auto idx = 0; + for (const auto& st : rtpStreams_) { + if (st.mediaAttribute_->label_ == stream.mediaAttribute_->label_) + return idx; + if (st.mediaAttribute_->type_ == stream.mediaAttribute_->type_) + idx++; + } + return -1; + }; + + // To get audio_0 ; video_0 + auto idx = streamIdx(); + auto streamId = sip_utils::streamId(id_, idx, stream.mediaAttribute_->type_); if (stream.mediaAttribute_->type_ == MediaType::MEDIA_AUDIO) { - stream.rtpSession_ = std::make_shared(id_); + stream.rtpSession_ = std::make_shared(id_, + streamId); } #ifdef ENABLE_VIDEO else if (stream.mediaAttribute_->type_ == MediaType::MEDIA_VIDEO) { - stream.rtpSession_ = std::make_shared(id_, getVideoSettings()); + stream.rtpSession_ = std::make_shared(id_, streamId, getVideoSettings()); std::static_pointer_cast(stream.rtpSession_)->setRotation(rotation_); } #endif @@ -1044,11 +1060,7 @@ SIPCall::hangup(int reason) // Stop all RTP streams stopAllMedia(); - if (auto conf = getConference()) { - if (auto mixer = conf->getVideoMixer()) { - mixer->removeAudioOnlySource(getCallId()); - } - } + detachAudioFromConference(); setState(Call::ConnectionState::DISCONNECTED, reason); dht::ThreadPool::io().run([w = weak()] { if (auto shared = w.lock()) @@ -1056,6 +1068,20 @@ SIPCall::hangup(int reason) }); } +void +SIPCall::detachAudioFromConference() +{ + if (auto conf = getConference()) { + if (auto mixer = conf->getVideoMixer()) { + for (const auto& stream : rtpStreams_) { + if (stream.mediaAttribute_->type_ == MediaType::MEDIA_AUDIO) { + mixer->removeAudioOnlySource(getCallId(), stream.rtpSession_->streamId()); + } + } + } + } +} + void SIPCall::refuse() { @@ -1404,12 +1430,7 @@ SIPCall::peerHungup() if (inviteSession_) terminateSipSession(PJSIP_SC_NOT_FOUND); - if (auto conf = getConference()) { - if (auto mixer = conf->getVideoMixer()) { - mixer->removeAudioOnlySource(getCallId()); - } - } - + detachAudioFromConference(); Call::peerHungup(); } diff --git a/src/sip/sipcall.h b/src/sip/sipcall.h index 7ec0f2bb23d..4278c60a05f 100644 --- a/src/sip/sipcall.h +++ b/src/sip/sipcall.h @@ -485,6 +485,7 @@ class SIPCall : public Call {"v:remote", false}}; void resetMediaReady(); + void detachAudioFromConference(); std::mutex setupSuccessMutex_; #ifdef ENABLE_VIDEO