diff --git a/src/client/callmanager.cpp b/src/client/callmanager.cpp index 9735501e726..11a65f1d324 100644 --- a/src/client/callmanager.cpp +++ b/src/client/callmanager.cpp @@ -578,14 +578,7 @@ setActiveStream(const std::string& accountId, { if (const auto account = jami::Manager::instance().getAccount(accountId)) { if (auto conf = account->getConference(confId)) { - // TODO, as for now the videoMixer doesn't have the streamId - if (deviceId == account->currentDeviceId() && accountUri == account->getUsername()) { - conf->setActiveStream("", state); - } else if (auto call = std::static_pointer_cast( - conf->getCallFromPeerID(accountUri))) { - conf->setActiveStream(call->getCallId(), state); - } - // conf->setActiveStream(streamId, state); + conf->setActiveStream(streamId, state); } else if (auto call = account->getCall(confId)) { if (call->conferenceProtocolVersion() == 1) { Json::Value sinkVal; diff --git a/src/conference.cpp b/src/conference.cpp index 71f077aa1c1..7583bcf3cf8 100644 --- a/src/conference.cpp +++ b/src/conference.cpp @@ -109,14 +109,14 @@ Conference::Conference(const std::shared_ptr& account) } auto hostAdded = false; // Handle participants showing their video - std::unique_lock lk(shared->videoToCallMtx_); for (const auto& info : infos) { std::string uri {}; bool isLocalMuted = false; std::string deviceId {}; auto active = false; - if (!info.id.empty()) { - if (auto call = std::dynamic_pointer_cast(getCall(info.id))) { + if (!info.callId.empty()) { + std::string callId = info.callId; + if (auto call = std::dynamic_pointer_cast(getCall(callId))) { uri = call->getPeerNumber(); isLocalMuted = call->isPeerMuted(); if (auto* transport = call->getTransport()) @@ -125,13 +125,12 @@ Conference::Conference(const std::shared_ptr& account) std::string_view peerId = string_remove_suffix(uri, '@'); auto isModerator = shared->isModerator(peerId); auto isHandRaised = shared->isHandRaised(deviceId); - auto isModeratorMuted = shared->isMuted(info.id); - auto sinkId = shared->getConfId() + peerId; + auto isModeratorMuted = shared->isMuted(callId); if (auto videoMixer = shared->videoMixer_) - active = videoMixer->verifyActive(info.id); // TODO streamId + active = videoMixer->verifyActive(info.streamId); newInfo.emplace_back(ParticipantInfo {std::move(uri), deviceId, - std::move(sinkId), + std::move(info.streamId), active, info.x, info.y, @@ -143,30 +142,31 @@ Conference::Conference(const std::shared_ptr& account) isModerator, isHandRaised}); } else { - auto it = shared->videoToCall_.find(info.source); - if (it == shared->videoToCall_.end()) - it = shared->videoToCall_.emplace_hint(it, info.source, std::string()); - // If not local auto isModeratorMuted = false; - if (!it->second.empty()) { + // If not local + auto streamInfo = shared->videoMixer_->streamInfo(info.source); + std::string streamId = streamInfo.streamId; + if (!streamId.empty()) { // Retrieve calls participants // TODO: this is a first version, we assume that the peer is not // a master of a conference and there is only one remote // In the future, we should retrieve confInfo from the call // To merge layouts informations - // TODO sinkId - auto isModeratorMuted = shared->isMuted(it->second); + isModeratorMuted = shared->isMuted(streamId); if (auto videoMixer = shared->videoMixer_) - active = videoMixer->verifyActive(it->second); - if (auto call = std::dynamic_pointer_cast(getCall(it->second))) { + active = videoMixer->verifyActive(streamId); + if (auto call = std::dynamic_pointer_cast(getCall(streamInfo.callId))) { uri = call->getPeerNumber(); isLocalMuted = call->isPeerMuted(); if (auto* transport = call->getTransport()) deviceId = transport->deviceId(); } + } else { + streamId = sip_utils::streamId("", 0, MediaType::MEDIA_VIDEO); + if (auto videoMixer = shared->videoMixer_) + active = videoMixer->verifyActive(streamId); } std::string_view peerId = string_remove_suffix(uri, '@'); - // TODO (another patch): use deviceId instead of peerId as specified in protocol auto isModerator = shared->isModerator(peerId); if (uri.empty() && !hostAdded) { hostAdded = true; @@ -175,12 +175,9 @@ Conference::Conference(const std::shared_ptr& account) isLocalMuted = shared->isMediaSourceMuted(MediaType::MEDIA_AUDIO); } auto isHandRaised = shared->isHandRaised(deviceId); - auto sinkId = shared->getConfId() + peerId; - if (auto videoMixer = shared->videoMixer_) - active |= videoMixer->verifyActive(info.source); newInfo.emplace_back(ParticipantInfo {std::move(uri), deviceId, - std::move(sinkId), + std::move(streamId), active, info.x, info.y, @@ -197,7 +194,6 @@ Conference::Conference(const std::shared_ptr& account) newInfo.h = videoMixer->getHeight(); newInfo.w = videoMixer->getWidth(); } - lk.unlock(); if (!hostAdded) { ParticipantInfo pi; pi.videoMuted = true; @@ -218,10 +214,8 @@ Conference::Conference(const std::shared_ptr& account) }); parser_.onRaiseHand( [&](const auto& deviceId, bool state) { setHandRaised(deviceId, state); }); - parser_.onSetActiveStream([&](const auto& accountUri, const auto& deviceId, bool state) { - // TODO replace per streamId - if (auto call = getCallWith(accountUri, deviceId)) - setHandRaised(call->getCallId(), state); + parser_.onSetActiveStream([&](const auto& streamId, bool state) { + setActiveStream(streamId, state); }); parser_.onMuteStreamAudio ( @@ -616,11 +610,12 @@ Conference::handleMediaChangeRequest(const std::shared_ptr& call, #ifdef ENABLE_VIDEO // If the new media list has video, remove the participant from audioonlylist. - if (MediaAttribute::hasMediaType(MediaAttribute::buildMediaAttributesList(remoteMediaList, - false), - MediaType::MEDIA_VIDEO)) { - if (videoMixer_) - videoMixer_->removeAudioOnlySource(call->getCallId()); + if (videoMixer_ && MediaAttribute::hasMediaType( + MediaAttribute::buildMediaAttributesList(remoteMediaList, false), + MediaType::MEDIA_VIDEO)) { + auto callId = call->getCallId(); + videoMixer_->removeAudioOnlySource(callId, + std::string(sip_utils::streamId(callId, 0, MediaType::MEDIA_VIDEO))); } #endif @@ -708,10 +703,8 @@ Conference::addParticipant(const std::string& participant_id) // In conference, if a participant joins with an audio only // call, it must be listed in the audioonlylist. auto mediaList = call->getMediaAttributeList(); - if (not MediaAttribute::hasMediaType(mediaList, MediaType::MEDIA_VIDEO)) { - if (videoMixer_) { - videoMixer_->addAudioOnlySource(call->getCallId()); - } + if (videoMixer_ && not MediaAttribute::hasMediaType(mediaList, MediaType::MEDIA_VIDEO)) { + videoMixer_->addAudioOnlySource(call->getCallId(), sip_utils::streamId(call->getCallId(), 0, MediaType::MEDIA_AUDIO)); } call->enterConference(shared_from_this()); // Continue the recording for the conference if one participant was recording @@ -739,14 +732,11 @@ Conference::setActiveParticipant(const std::string& participant_id) if (!videoMixer_) return; if (isHost(participant_id)) { - videoMixer_->addActiveHost(); + videoMixer_->setActiveStream(sip_utils::streamId("", 0, MediaType::MEDIA_VIDEO)); return; } if (auto call = getCallFromPeerID(participant_id)) { - if (auto videoRecv = call->getReceiveVideoFrameActiveWriter()) - videoMixer_->setActiveStream(videoRecv.get()); - else - videoMixer_->setActiveStream(call->getCallId()); + videoMixer_->setActiveStream(sip_utils::streamId(call->getCallId(), 0, MediaType::MEDIA_VIDEO)); return; } @@ -764,25 +754,13 @@ Conference::setActiveParticipant(const std::string& participant_id) void Conference::setActiveStream(const std::string& streamId, bool state) { - // TODO BUG: for now activeStream is the callId, and should be the sink! #ifdef ENABLE_VIDEO if (!videoMixer_) return; - if (state) { - // TODO remove - if (streamId.empty()) { - videoMixer_->addActiveHost(); - } else if (auto call = getCall(streamId)) { - if (auto videoRecv = call->getReceiveVideoFrameActiveWriter()) - videoMixer_->setActiveStream(videoRecv.get()); - else - videoMixer_->setActiveStream(call->getCallId()); - return; - } - // TODO videoMixer_->setActiveStream(sinkId); - } else { + if (state) + videoMixer_->setActiveStream(streamId); + else videoMixer_->resetActiveStream(); - } #endif } @@ -872,27 +850,6 @@ Conference::createSinks(const ConfInfo& infos) sink), confSinksMap_); } - -void -Conference::attachVideo(Observable>* frame, const std::string& callId) -{ - JAMI_DBG("[conf:%s] attaching video of call %s", id_.c_str(), callId.c_str()); - std::lock_guard lk(videoToCallMtx_); - videoToCall_.emplace(frame, callId); - frame->attach(videoMixer_.get()); -} - -void -Conference::detachVideo(Observable>* frame) -{ - std::lock_guard lk(videoToCallMtx_); - auto it = videoToCall_.find(frame); - if (it != videoToCall_.end()) { - JAMI_DBG("[conf:%s] detaching video of call %s", id_.c_str(), it->second.c_str()); - it->first->detach(videoMixer_.get()); - videoToCall_.erase(it); - } -} #endif void @@ -911,8 +868,8 @@ Conference::removeParticipant(const std::string& participant_id) #ifdef ENABLE_VIDEO auto sinkId = getConfId() + peerId; // Remove if active - // TODO if (videoMixer_->verifyActive(sinkId)) - if (videoMixer_->verifyActive(participant_id)) + // TODO all streams + if (videoMixer_->verifyActive(sip_utils::streamId(participant_id, 0, MediaType::MEDIA_VIDEO))) videoMixer_->resetActiveStream(); call->exitConference(); if (call->isPeerRecording()) @@ -1252,7 +1209,6 @@ Conference::setHandRaised(const std::string& deviceId, const bool& state) if (auto* transport = call->getTransport()) callDeviceId = transport->deviceId(); if (deviceId == callDeviceId) { - JAMI_ERR() << "@@@X"; if (state and not isPeerRequiringAttention) { JAMI_DBG("Raise %s hand", deviceId.c_str()); handsRaised_.emplace(deviceId); diff --git a/src/conference.h b/src/conference.h index a992ab40d37..683a965bade 100644 --- a/src/conference.h +++ b/src/conference.h @@ -335,8 +335,6 @@ class Conference : public Recordable, public std::enable_shared_from_this>* frame, const std::string& callId); - void detachVideo(Observable>* frame); std::shared_ptr getVideoMixer(); std::string getVideoInput() const { return hostVideoSource_.sourceUri_; } #endif @@ -400,9 +398,6 @@ class Conference : public Recordable, public std::enable_shared_from_this>*, std::string> videoToCall_ {}; std::shared_ptr ghostRingBuffer_; #ifdef ENABLE_VIDEO diff --git a/src/conference_protocol.cpp b/src/conference_protocol.cpp index 93a28b3a414..78d5d6de482 100644 --- a/src/conference_protocol.cpp +++ b/src/conference_protocol.cpp @@ -158,9 +158,7 @@ ConfProtocolParser::parseV1() mediaVal[ProtocolKeys::MUTEAUDIO].asBool()); } if (mediaVal.isMember(ProtocolKeys::ACTIVE)) { - // TODO streamId - setActiveStream_(accountUri, - deviceId, + setActiveStream_(streamId, mediaVal[ProtocolKeys::ACTIVE].asBool()); } } diff --git a/src/conference_protocol.h b/src/conference_protocol.h index cee480826f3..f43e13074ff 100644 --- a/src/conference_protocol.h +++ b/src/conference_protocol.h @@ -59,10 +59,7 @@ class ConfProtocolParser { { raiseHand_ = std::move(cb); } - /** - * @todo, replace the 2 strings per streamId - */ - void onSetActiveStream(std::function&& cb) + void onSetActiveStream(std::function&& cb) { setActiveStream_ = std::move(cb); } @@ -125,7 +122,7 @@ class ConfProtocolParser { std::function checkAuthorization_; std::function hangupParticipant_; std::function raiseHand_; - std::function setActiveStream_; + std::function setActiveStream_; std::function muteStreamAudio_; std::function diff --git a/src/media/audio/audio_rtp_session.cpp b/src/media/audio/audio_rtp_session.cpp index 49bca44e7f6..928117d117b 100644 --- a/src/media/audio/audio_rtp_session.cpp +++ b/src/media/audio/audio_rtp_session.cpp @@ -48,21 +48,21 @@ namespace jami { -AudioRtpSession::AudioRtpSession(const std::string& id) - : RtpSession(id, MediaType::MEDIA_AUDIO) +AudioRtpSession::AudioRtpSession(const std::string& callId, const std::string& streamId) + : RtpSession(callId, streamId, MediaType::MEDIA_AUDIO) , rtcpCheckerThread_([] { return true; }, [this] { processRtcpChecker(); }, [] {}) { - JAMI_DBG("Created Audio RTP session: %p - call Id %s", this, callID_.c_str()); + JAMI_DBG("Created Audio RTP session: %p - call Id %s", this, callId_.c_str()); // don't move this into the initializer list or Cthulus will emerge - ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(callID_); + ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(callId_); } AudioRtpSession::~AudioRtpSession() { stop(); - JAMI_DBG("Destroyed Audio RTP session: %p - call Id %s", this, callID_.c_str()); + JAMI_DBG("Destroyed Audio RTP session: %p - call Id %s", this, callId_.c_str()); } void @@ -90,7 +90,7 @@ AudioRtpSession::startSender() audioInput_->detach(sender_.get()); // sender sets up input correctly, we just keep a reference in case startSender is called - audioInput_ = jami::getAudioInput(callID_); + audioInput_ = jami::getAudioInput(callId_); audioInput_->setMuted(muteState_); audioInput_->setSuccessfulSetupCb(onSuccessfulSetup_); auto newParams = audioInput_->switchInput(input_); @@ -117,7 +117,7 @@ AudioRtpSession::startSender() sender_.reset(); socketPair_->stopSendOp(false); sender_.reset( - new AudioSender(callID_, getRemoteRtpUri(), send_, *socketPair_, initSeqVal_, mtu_)); + new AudioSender(getRemoteRtpUri(), send_, *socketPair_, initSeqVal_, mtu_)); } catch (const MediaEncoderException& e) { JAMI_ERR("%s", e.what()); send_.enabled = false; @@ -160,7 +160,7 @@ AudioRtpSession::startReceiver() JAMI_WARN("Restarting audio receiver"); auto accountAudioCodec = std::static_pointer_cast(receive_.codec); - receiveThread_.reset(new AudioReceiveThread(callID_, + receiveThread_.reset(new AudioReceiveThread(callId_, accountAudioCodec->audioformat, receive_.receiving_sdp, mtu_)); @@ -332,7 +332,7 @@ AudioRtpSession::initRecorder(std::shared_ptr& rec) { if (receiveThread_) receiveThread_->attach(rec->addStream(receiveThread_->getInfo())); - if (auto input = jami::getAudioInput(callID_)) + if (auto input = jami::getAudioInput(callId_)) input->attach(rec->addStream(input->getInfo())); } @@ -344,7 +344,7 @@ AudioRtpSession::deinitRecorder(std::shared_ptr& rec) receiveThread_->detach(ob); } } - if (auto input = jami::getAudioInput(callID_)) { + if (auto input = jami::getAudioInput(callId_)) { if (auto ob = rec->getStream(input->getInfo().name)) { input->detach(ob); } diff --git a/src/media/audio/audio_rtp_session.h b/src/media/audio/audio_rtp_session.h index fb04899c1bf..9385b26021b 100644 --- a/src/media/audio/audio_rtp_session.h +++ b/src/media/audio/audio_rtp_session.h @@ -50,7 +50,7 @@ struct RTCPInfo class AudioRtpSession : public RtpSession { public: - AudioRtpSession(const std::string& id); + AudioRtpSession(const std::string& callId, const std::string& streamId); virtual ~AudioRtpSession(); void start(std::unique_ptr rtp_sock, std::unique_ptr rtcp_sock) override; diff --git a/src/media/audio/audio_sender.cpp b/src/media/audio/audio_sender.cpp index a67e3863be1..2f1122b4328 100644 --- a/src/media/audio/audio_sender.cpp +++ b/src/media/audio/audio_sender.cpp @@ -33,14 +33,12 @@ namespace jami { -AudioSender::AudioSender(const std::string& id, - const std::string& dest, +AudioSender::AudioSender(const std::string& dest, const MediaDescription& args, SocketPair& socketPair, const uint16_t seqVal, const uint16_t mtu) - : id_(id) - , dest_(dest) + : dest_(dest) , args_(args) , seqVal_(seqVal) , mtu_(mtu) diff --git a/src/media/audio/audio_sender.h b/src/media/audio/audio_sender.h index d48c53dccce..6dc58e3f6f7 100644 --- a/src/media/audio/audio_sender.h +++ b/src/media/audio/audio_sender.h @@ -37,8 +37,7 @@ class Resampler; class AudioSender : public Observer> { public: - AudioSender(const std::string& id, - const std::string& dest, + AudioSender(const std::string& dest, const MediaDescription& args, SocketPair& socketPair, const uint16_t seqVal, @@ -56,7 +55,6 @@ class AudioSender : public Observer> bool setup(SocketPair& socketPair); - std::string id_; std::string dest_; MediaDescription args_; std::unique_ptr audioEncoder_; diff --git a/src/media/rtp_session.h b/src/media/rtp_session.h index 715b8841b3d..453befe8cdd 100644 --- a/src/media/rtp_session.h +++ b/src/media/rtp_session.h @@ -42,8 +42,10 @@ class RtpSession // Media direction enum class Direction { SEND, RECV }; - RtpSession(const std::string& callID, MediaType type) - : callID_(callID) + // Note: callId is used for ring buffers and smarttools + RtpSession(const std::string& callId, const std::string& streamId, MediaType type) + : callId_(callId) + , streamId_(streamId) , mediaType_(type) {} virtual ~RtpSession() {}; @@ -82,9 +84,12 @@ class RtpSession const IpAddr& getSendAddr() const { return send_.addr; }; const IpAddr& getRecvAddr() const { return receive_.addr; }; + inline std::string streamId() const { return streamId_; } + protected: std::recursive_mutex mutex_; - const std::string callID_; + const std::string callId_; + const std::string streamId_; MediaType mediaType_; std::unique_ptr socketPair_; std::string input_ {}; diff --git a/src/media/video/video_mixer.cpp b/src/media/video/video_mixer.cpp index d0f2515d91d..8ece82fbbd1 100644 --- a/src/media/video/video_mixer.cpp +++ b/src/media/video/video_mixer.cpp @@ -32,6 +32,7 @@ #ifdef RING_ACCEL #include "accel.h" #endif +#include "sip/sip_utils.h" #include #include @@ -88,8 +89,7 @@ VideoMixer::VideoMixer(const std::string& id, const std::string& localInput) // Local video camera is the main participant if (not localInput.empty()) videoLocal_ = getVideoInput(localInput); - if (videoLocal_) - videoLocal_->attach(this); + attachVideo(videoLocal_.get(), "", sip_utils::streamId("", 0, MediaType::MEDIA_VIDEO)); loop_.start(); nextProcess_ = std::chrono::steady_clock::now(); @@ -100,16 +100,10 @@ VideoMixer::~VideoMixer() { stop_sink(); - if (videoLocal_) { - videoLocal_->detach(this); - // prefer to release it now than after the next join - videoLocal_.reset(); - } - if (videoLocalSecondary_) { - videoLocalSecondary_->detach(this); - // prefer to release it now than after the next join - videoLocalSecondary_.reset(); - } + detachVideo(videoLocal_.get()); + videoLocal_.reset(); + detachVideo(videoLocalSecondary_.get()); + videoLocalSecondary_.reset(); loop_.join(); @@ -139,8 +133,7 @@ VideoMixer::switchInput(const std::string& input) // Re-attach videoInput to mixer videoLocal_ = getVideoInput(input); - if (videoLocal_) - videoLocal_->attach(this); + attachVideo(videoLocal_.get(), "", sip_utils::streamId("", 0, MediaType::MEDIA_VIDEO)); } void @@ -164,9 +157,7 @@ VideoMixer::switchSecondaryInput(const std::string& input) // Re-attach videoInput to mixer videoLocalSecondary_ = getVideoInput(input); - if (videoLocalSecondary_) { - videoLocalSecondary_->attach(this); - } + attachVideo(videoLocalSecondary_.get(), "", sip_utils::streamId("", 1, MediaType::MEDIA_VIDEO)); } void @@ -178,35 +169,50 @@ VideoMixer::stopInput() } void -VideoMixer::addActiveHost() +VideoMixer::setActiveStream(const std::string& id) { - activeStream_ = ""; - activeSource_ = videoLocalSecondary_ ? videoLocalSecondary_.get() : videoLocal_.get(); + activeStream_ = id; updateLayout(); } void -VideoMixer::setActiveStream(Observable>* ob) +VideoMixer::updateLayout() { - activeStream_ = ""; - activeSource_ = ob; - updateLayout(); + if (activeStream_ == "") + currentLayout_ = Layout::GRID; + layoutUpdated_ += 1; } void -VideoMixer::setActiveStream(const std::string& id) +VideoMixer::attachVideo(Observable>* frame, const std::string& callId, const std::string& streamId) { - activeSource_ = nullptr; - activeStream_ = id; - updateLayout(); + if (!frame) return; + JAMI_DBG("Attaching video with streamId %s", streamId.c_str()); + std::lock_guard lk(videoToStreamInfoMtx_); + videoToStreamInfo_.emplace(frame, StreamInfo {callId, streamId}); + frame->attach(this); } void -VideoMixer::updateLayout() +VideoMixer::detachVideo(Observable>* frame) { - if (activeStream_ == "" && activeSource_ == nullptr) - currentLayout_ = Layout::GRID; - layoutUpdated_ += 1; + if (!frame) + return; + bool detach = false; + std::unique_lock lk(videoToStreamInfoMtx_); + auto it = videoToStreamInfo_.find(frame); + if (it != videoToStreamInfo_.end()) { + JAMI_DBG("Detaching video of call %s", it->second.callId.c_str()); + detach = true; + // Handle the case where the current shown source leave the conference + // Note, do not call resetActiveStream() to avoid multiple updates + if (verifyActive(it->second.streamId)) + activeStream_ = {}; + videoToStreamInfo_.erase(it); + } + lk.unlock(); + if (detach) + frame->detach(this); } void @@ -230,9 +236,6 @@ VideoMixer::detached(Observable>* ob) for (const auto& x : sources_) { if (x->source == ob) { - // Handle the case where the current shown source leave the conference - if (verifyActive(ob)) - resetActiveStream(); JAMI_DBG("Remove source [%p]", x.get()); sources_.remove(x); JAMI_DBG("Total sources: %lu", sources_.size()); @@ -303,11 +306,10 @@ VideoMixer::process() std::vector sourcesInfo; sourcesInfo.reserve(sources_.size() + audioOnlySources_.size()); // add all audioonlysources - for (auto& id : audioOnlySources_) { - JAMI_ERR() << "@@@ " << id; - auto active = verifyActive(id); + for (auto& [callId, streamId] : audioOnlySources_) { + auto active = verifyActive(streamId); if (currentLayout_ != Layout::ONE_BIG or active) { - sourcesInfo.emplace_back(SourceInfo {{}, 0, 0, 10, 10, false, id}); + sourcesInfo.emplace_back(SourceInfo {{}, 0, 0, 10, 10, false, callId, streamId}); } if (currentLayout_ == Layout::ONE_BIG and active) successfullyRendered = true; @@ -318,7 +320,8 @@ VideoMixer::process() if (!loop_.isRunning()) return; - auto activeSource = verifyActive(x->source); + auto sinfo = streamInfo(x->source); + auto activeSource = verifyActive(sinfo.streamId); if (currentLayout_ != Layout::ONE_BIG or activeSource) { // make rendered frame temporarily unavailable for update() // to avoid concurrent access. @@ -385,8 +388,9 @@ VideoMixer::process() layoutUpdated_ -= 1; if (layoutUpdated_ == 0) { for (auto& x : sources_) { + auto sinfo = streamInfo(x->source); sourcesInfo.emplace_back( - SourceInfo {x->source, x->x, x->y, x->w, x->h, x->hasVideo, {}}); + SourceInfo {x->source, x->x, x->y, x->w, x->h, x->hasVideo, sinfo.callId, sinfo.streamId}); } if (onSourcesUpdated_) onSourcesUpdated_(std::move(sourcesInfo)); diff --git a/src/media/video/video_mixer.h b/src/media/video/video_mixer.h index bbd982ddd40..6c014ea8c2f 100644 --- a/src/media/video/video_mixer.h +++ b/src/media/video/video_mixer.h @@ -37,6 +37,11 @@ namespace video { class SinkClient; +struct StreamInfo { + std::string callId; + std::string streamId; +}; + struct SourceInfo { Observable>* source; @@ -45,7 +50,8 @@ struct SourceInfo int w; int h; bool hasVideo; - std::string id; + std::string callId; + std::string streamId; }; using OnSourcesUpdatedCb = std::function&&)>; @@ -73,27 +79,18 @@ class VideoMixer : public VideoGenerator, public VideoFramePassiveReader void switchSecondaryInput(const std::string& input); void stopInput(); - void setActiveStream(Observable>* ob); void setActiveStream(const std::string& id); void resetActiveStream() { activeStream_ = {}; - activeSource_ = {}; updateLayout(); } - void addActiveHost(); - // TODO group, we can only use a set of string to identify actives bool verifyActive(const std::string& id) { return activeStream_ == id; } - bool verifyActive(Observable>* ob) - { - return activeSource_ == ob; - } - void setVideoLayout(Layout newLayout) { currentLayout_ = newLayout; @@ -114,18 +111,33 @@ class VideoMixer : public VideoGenerator, public VideoFramePassiveReader std::shared_ptr& getSink() { return sink_; } - void addAudioOnlySource(const std::string& id) + void addAudioOnlySource(const std::string& callId, const std::string& streamId) { - std::lock_guard lk(audioOnlySourcesMtx_); - audioOnlySources_.insert(id); + std::unique_lock lk(audioOnlySourcesMtx_); + audioOnlySources_.insert({callId, streamId}); + lk.unlock(); updateLayout(); } - void removeAudioOnlySource(const std::string& id) + void removeAudioOnlySource(const std::string& callId, const std::string& streamId) { - std::lock_guard lk(audioOnlySourcesMtx_); - if (audioOnlySources_.erase(id)) + std::unique_lock lk(audioOnlySourcesMtx_); + if (audioOnlySources_.erase({callId, streamId})) { + lk.unlock(); updateLayout(); + } + } + + void attachVideo(Observable>* frame, const std::string& callId, const std::string& streamId); + void detachVideo(Observable>* frame); + + StreamInfo streamInfo(Observable>* frame) const + { + std::lock_guard lk(videoToStreamInfoMtx_); + auto it = videoToStreamInfo_.find(frame); + if (it == videoToStreamInfo_.end()) + return {}; + return it->second; } private: @@ -162,11 +174,14 @@ class VideoMixer : public VideoGenerator, public VideoFramePassiveReader ThreadLoop loop_; // as to be last member Layout currentLayout_ {Layout::GRID}; - Observable>* activeSource_ {nullptr}; std::list> sources_; + // We need to convert call to frame + mutable std::mutex videoToStreamInfoMtx_ {}; + std::map>*, StreamInfo> videoToStreamInfo_ {}; + std::mutex audioOnlySourcesMtx_; - std::set audioOnlySources_; + std::set> audioOnlySources_; std::string activeStream_ {}; std::atomic_int layoutUpdated_ {0}; diff --git a/src/media/video/video_rtp_session.cpp b/src/media/video/video_rtp_session.cpp index df09ddbecac..b91f5198799 100644 --- a/src/media/video/video_rtp_session.cpp +++ b/src/media/video/video_rtp_session.cpp @@ -59,15 +59,17 @@ constexpr auto EXPIRY_TIME_RTCP = std::chrono::seconds(2); constexpr auto DELAY_AFTER_REMB_INC = std::chrono::seconds(1); constexpr auto DELAY_AFTER_REMB_DEC = std::chrono::milliseconds(500); -VideoRtpSession::VideoRtpSession(const string& callID, const DeviceParams& localVideoParams) - : RtpSession(callID, MediaType::MEDIA_VIDEO) +VideoRtpSession::VideoRtpSession(const string& callId, + const string& streamId, + const DeviceParams& localVideoParams) + : RtpSession(callId, streamId, MediaType::MEDIA_VIDEO) , localVideoParams_(localVideoParams) , videoBitrateInfo_ {} , rtcpCheckerThread_([] { return true; }, [this] { processRtcpChecker(); }, [] {}) { setupVideoBitrateInfo(); // reset bitrate cc = std::make_unique(); - JAMI_DBG("[%p] Video RTP session created for call %s", this, callID_.c_str()); + JAMI_DBG("[%p] Video RTP session created for call %s", this, callId_.c_str()); } VideoRtpSession::~VideoRtpSession() @@ -247,7 +249,7 @@ VideoRtpSession::startReceiver() if (receiveThread_) JAMI_WARN("[%p] Already has a receiver, restarting", this); receiveThread_.reset( - new VideoReceiveThread(callID_, !conference_, receive_.receiving_sdp, mtu_)); + new VideoReceiveThread(callId_, !conference_, receive_.receiving_sdp, mtu_)); // XXX keyframe requests can timeout if unanswered receiveThread_->addIOContext(*socketPair_); @@ -255,23 +257,27 @@ VideoRtpSession::startReceiver() receiveThread_->startLoop(); receiveThread_->setRequestKeyFrameCallback([this]() { cbKeyFrameRequest_(); }); receiveThread_->setRotation(rotation_.load()); - if (videoMixer_) { - auto activeParticipant = videoMixer_->verifyActive(receiveThread_.get()) - || videoMixer_->verifyActive(callID_); - videoMixer_->removeAudioOnlySource(callID_); - if (activeParticipant) - videoMixer_->setActiveStream(receiveThread_.get()); + if (videoMixer_ and conference_) { + // Note, this should be managed differently, this is a bit hacky + auto audioId = streamId_; + string_replace(audioId, "video", "audio"); + auto activeStream = videoMixer_->verifyActive(audioId); + videoMixer_->removeAudioOnlySource(callId_, audioId); + if (activeStream) + videoMixer_->setActiveStream(streamId_); } } else { JAMI_DBG("[%p] Video receiver disabled", this); - if (receiveThread_ and videoMixer_) { - auto activeParticipant = videoMixer_->verifyActive(receiveThread_.get()) - || videoMixer_->verifyActive(callID_); - videoMixer_->addAudioOnlySource(callID_); + if (receiveThread_ and videoMixer_ and conference_) { + // Note, this should be managed differently, this is a bit hacky + auto audioId_ = streamId_; + string_replace(audioId_, "video", "audio"); + auto activeStream = videoMixer_->verifyActive(streamId_); + videoMixer_->addAudioOnlySource(callId_, audioId_); receiveThread_->detach(videoMixer_.get()); - if (activeParticipant) - videoMixer_->setActiveStream(callID_); + if (activeStream) + videoMixer_->setActiveStream(audioId_); } } if (socketPair_) @@ -289,11 +295,13 @@ VideoRtpSession::stopReceiver() return; if (videoMixer_) { - auto activeParticipant = videoMixer_->verifyActive(receiveThread_.get()) || videoMixer_->verifyActive(callID_); - videoMixer_->addAudioOnlySource(callID_); + auto activeStream = videoMixer_->verifyActive(streamId_); + auto audioId = streamId_; + string_replace(audioId, "video", "audio"); + videoMixer_->addAudioOnlySource(callId_, audioId); receiveThread_->detach(videoMixer_.get()); - if (activeParticipant) - videoMixer_->setActiveStream(callID_); + if (activeStream) + videoMixer_->setActiveStream(audioId); } // We need to disable the read operation, otherwise the @@ -463,7 +471,7 @@ VideoRtpSession::setupConferenceVideoPipeline(Conference& conference, Direction JAMI_DBG("[%p] Setup video sender pipeline on conference %s for call %s", this, conference.getConfId().c_str(), - callID_.c_str()); + callId_.c_str()); videoMixer_ = conference.getVideoMixer(); if (sender_) { // Swap sender from local video to conference video mixer @@ -478,10 +486,11 @@ VideoRtpSession::setupConferenceVideoPipeline(Conference& conference, Direction JAMI_DBG("[%p] Setup video receiver pipeline on conference %s for call %s", this, conference.getConfId().c_str(), - callID_.c_str()); + callId_.c_str()); if (receiveThread_) { receiveThread_->stopSink(); - conference.attachVideo(receiveThread_.get(), callID_); + if (videoMixer_) + videoMixer_->attachVideo(receiveThread_.get(), callId_, streamId_); } else { JAMI_WARN("[%p] no receiver", this); } @@ -545,11 +554,11 @@ VideoRtpSession::exitConference() videoMixer_->detach(sender_.get()); if (receiveThread_) { - auto activetParticipant = videoMixer_->verifyActive(receiveThread_.get()); - conference_->detachVideo(receiveThread_.get()); + auto activeStream = videoMixer_->verifyActive(streamId_); + videoMixer_->detachVideo(receiveThread_.get()); receiveThread_->startSink(); - if (activetParticipant) - videoMixer_->setActiveStream(callID_); + if (activeStream) + videoMixer_->setActiveStream(streamId_); } videoMixer_.reset(); diff --git a/src/media/video/video_rtp_session.h b/src/media/video/video_rtp_session.h index 16277f51aa1..afa05f08910 100644 --- a/src/media/video/video_rtp_session.h +++ b/src/media/video/video_rtp_session.h @@ -70,7 +70,7 @@ class VideoRtpSession : public RtpSession public: using BaseType = RtpSession; - VideoRtpSession(const std::string& callID, const DeviceParams& localVideoParams); + VideoRtpSession(const std::string& callId, const std::string& streamId, const DeviceParams& localVideoParams); ~VideoRtpSession(); void setRequestKeyFrameCallback(std::function cb); diff --git a/src/sip/sip_utils.cpp b/src/sip/sip_utils.cpp index 31d295a4f6c..0787db5243f 100644 --- a/src/sip/sip_utils.cpp +++ b/src/sip/sip_utils.cpp @@ -293,6 +293,14 @@ sip_strerror(pj_status_t code) return std::string {ret.ptr, ret.ptr + ret.slen}; } +std::string +streamId(const std::string& callId, uint32_t idx, MediaType mt) +{ + if (callId.empty()) + return fmt::format("host_{}_{}", (mt == MediaType::MEDIA_VIDEO ? "video" : "audio"), idx); + return fmt::format("{}_{}_{}", callId, (mt == MediaType::MEDIA_VIDEO ? "video" : "audio"), idx); +} + void sockaddr_to_host_port(pj_pool_t* pool, pjsip_host_port* host_port, const pj_sockaddr* addr) { diff --git a/src/sip/sip_utils.h b/src/sip/sip_utils.h index e4c1911f203..974e3a76442 100644 --- a/src/sip/sip_utils.h +++ b/src/sip/sip_utils.h @@ -143,6 +143,9 @@ as_view(const pj_str_t& str) noexcept return {str.ptr, (size_t) str.slen}; } +std::string +streamId(const std::string& callId, uint32_t idx, MediaType mt); + // PJSIP dialog locking in RAII way // Usage: declare local variable like this: sip_utils::PJDialogLock lock {dialog}; // The lock is kept until the local variable is deleted diff --git a/src/sip/sipcall.cpp b/src/sip/sipcall.cpp index aeb7ded93c0..aebbc9f98b2 100644 --- a/src/sip/sipcall.cpp +++ b/src/sip/sipcall.cpp @@ -175,12 +175,28 @@ SIPCall::createRtpSession(RtpStream& stream) if (not stream.mediaAttribute_) throw std::runtime_error("Missing media attribute"); + // Find idx of this stream as we can share several audio/videos + auto streamIdx = [&]() { + auto idx = 0; + for (const auto& st : rtpStreams_) { + if (st.mediaAttribute_->label_ == stream.mediaAttribute_->label_) + return idx; + if (st.mediaAttribute_->type_ == stream.mediaAttribute_->type_) + idx++; + } + return -1; + }; + + // To get audio_0 ; video_0 + auto idx = streamIdx(); + auto streamId = sip_utils::streamId(id_, idx, stream.mediaAttribute_->type_); if (stream.mediaAttribute_->type_ == MediaType::MEDIA_AUDIO) { - stream.rtpSession_ = std::make_shared(id_); + stream.rtpSession_ = std::make_shared(id_, + streamId); } #ifdef ENABLE_VIDEO else if (stream.mediaAttribute_->type_ == MediaType::MEDIA_VIDEO) { - stream.rtpSession_ = std::make_shared(id_, getVideoSettings()); + stream.rtpSession_ = std::make_shared(id_, streamId, getVideoSettings()); std::static_pointer_cast(stream.rtpSession_)->setRotation(rotation_); } #endif @@ -1044,11 +1060,7 @@ SIPCall::hangup(int reason) // Stop all RTP streams stopAllMedia(); - if (auto conf = getConference()) { - if (auto mixer = conf->getVideoMixer()) { - mixer->removeAudioOnlySource(getCallId()); - } - } + detachAudioFromConference(); setState(Call::ConnectionState::DISCONNECTED, reason); dht::ThreadPool::io().run([w = weak()] { if (auto shared = w.lock()) @@ -1056,6 +1068,20 @@ SIPCall::hangup(int reason) }); } +void +SIPCall::detachAudioFromConference() +{ + if (auto conf = getConference()) { + if (auto mixer = conf->getVideoMixer()) { + for (const auto& stream : rtpStreams_) { + if (stream.mediaAttribute_->type_ == MediaType::MEDIA_AUDIO) { + mixer->removeAudioOnlySource(getCallId(), stream.rtpSession_->streamId()); + } + } + } + } +} + void SIPCall::refuse() { @@ -1404,12 +1430,7 @@ SIPCall::peerHungup() if (inviteSession_) terminateSipSession(PJSIP_SC_NOT_FOUND); - if (auto conf = getConference()) { - if (auto mixer = conf->getVideoMixer()) { - mixer->removeAudioOnlySource(getCallId()); - } - } - + detachAudioFromConference(); Call::peerHungup(); } diff --git a/src/sip/sipcall.h b/src/sip/sipcall.h index 7ec0f2bb23d..4278c60a05f 100644 --- a/src/sip/sipcall.h +++ b/src/sip/sipcall.h @@ -485,6 +485,7 @@ class SIPCall : public Call {"v:remote", false}}; void resetMediaReady(); + void detachAudioFromConference(); std::mutex setupSuccessMutex_; #ifdef ENABLE_VIDEO