Skip to content
This repository was archived by the owner on Aug 23, 2022. It is now read-only.

Commit 5f386db

Browse files
authored
Merge pull request #3 from robashton/main
Wrap RtcpReader in fresh objects per call to bind_rtcp_reader in default interceptors
2 parents fd129ea + 9d88493 commit 5f386db

File tree

3 files changed

+26
-44
lines changed

3 files changed

+26
-44
lines changed

src/nack/responder/mod.rs

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::{
88
};
99
use responder_stream::ResponderStream;
1010

11-
use crate::error::{Error, Result};
11+
use crate::error::Result;
1212
use crate::nack::stream_support_nack;
1313

1414
use async_trait::async_trait;
@@ -44,7 +44,6 @@ impl InterceptorBuilder for ResponderBuilder {
4444
13 // 8192 = 1 << 13
4545
},
4646
streams: Arc::new(Mutex::new(HashMap::new())),
47-
parent_rtcp_reader: Mutex::new(None),
4847
}),
4948
}))
5049
}
@@ -53,7 +52,6 @@ impl InterceptorBuilder for ResponderBuilder {
5352
pub struct ResponderInternal {
5453
log2_size: u8,
5554
streams: Arc<Mutex<HashMap<u32, Arc<ResponderStream>>>>,
56-
parent_rtcp_reader: Mutex<Option<Arc<dyn RTCPReader + Send + Sync>>>,
5755
}
5856

5957
impl ResponderInternal {
@@ -92,27 +90,22 @@ impl ResponderInternal {
9290
}
9391
}
9492

93+
pub struct ResponderRtcpReader {
94+
parent_rtcp_reader: Arc<dyn RTCPReader + Send + Sync>,
95+
internal: Arc<ResponderInternal>,
96+
}
97+
9598
#[async_trait]
96-
impl RTCPReader for ResponderInternal {
99+
impl RTCPReader for ResponderRtcpReader {
97100
async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
98-
let (n, attr) = {
99-
let parent_rtcp_reader = {
100-
let parent_rtcp_reader = self.parent_rtcp_reader.lock().await;
101-
parent_rtcp_reader.clone()
102-
};
103-
if let Some(reader) = parent_rtcp_reader {
104-
reader.read(buf, a).await?
105-
} else {
106-
return Err(Error::ErrInvalidParentRtcpReader);
107-
}
108-
};
101+
let (n, attr) = { self.parent_rtcp_reader.read(buf, a).await? };
109102

110103
let mut b = &buf[..n];
111104
let pkts = rtcp::packet::unmarshal(&mut b)?;
112105
for p in &pkts {
113106
if let Some(nack) = p.as_any().downcast_ref::<TransportLayerNack>() {
114107
let nack = nack.clone();
115-
let streams = Arc::clone(&self.streams);
108+
let streams = Arc::clone(&self.internal.streams);
116109
tokio::spawn(async move {
117110
ResponderInternal::resend_packets(streams, nack).await;
118111
});
@@ -143,12 +136,10 @@ impl Interceptor for Responder {
143136
&self,
144137
reader: Arc<dyn RTCPReader + Send + Sync>,
145138
) -> Arc<dyn RTCPReader + Send + Sync> {
146-
{
147-
let mut parent_rtcp_reader = self.internal.parent_rtcp_reader.lock().await;
148-
*parent_rtcp_reader = Some(reader);
149-
}
150-
151-
Arc::clone(&self.internal) as Arc<dyn RTCPReader + Send + Sync>
139+
Arc::new(ResponderRtcpReader {
140+
internal: Arc::clone(&self.internal),
141+
parent_rtcp_reader: reader,
142+
}) as Arc<dyn RTCPReader + Send + Sync>
152143
}
153144

154145
/// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method

src/report/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ impl ReportBuilder {
4444
Duration::from_secs(1)
4545
},
4646
now: self.now.clone(),
47-
parent_rtcp_reader: Mutex::new(None),
4847
streams: Mutex::new(HashMap::new()),
4948
close_rx: Mutex::new(Some(close_rx)),
5049
}),

src/report/receiver/mod.rs

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,24 @@ use waitgroup::WaitGroup;
1515
pub(crate) struct ReceiverReportInternal {
1616
pub(crate) interval: Duration,
1717
pub(crate) now: Option<FnTimeGen>,
18-
pub(crate) parent_rtcp_reader: Mutex<Option<Arc<dyn RTCPReader + Send + Sync>>>,
1918
pub(crate) streams: Mutex<HashMap<u32, Arc<ReceiverStream>>>,
2019
pub(crate) close_rx: Mutex<Option<mpsc::Receiver<()>>>,
2120
}
2221

22+
pub(crate) struct ReceiverReportRtcpReader {
23+
pub(crate) internal: Arc<ReceiverReportInternal>,
24+
pub(crate) parent_rtcp_reader: Arc<dyn RTCPReader + Send + Sync>,
25+
}
26+
2327
#[async_trait]
24-
impl RTCPReader for ReceiverReportInternal {
28+
impl RTCPReader for ReceiverReportRtcpReader {
2529
async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
26-
let (n, attr) = {
27-
let parent_rtcp_reader = {
28-
let parent_rtcp_reader = self.parent_rtcp_reader.lock().await;
29-
parent_rtcp_reader.clone()
30-
};
31-
if let Some(reader) = parent_rtcp_reader {
32-
reader.read(buf, a).await?
33-
} else {
34-
return Err(Error::ErrInvalidParentRtcpReader);
35-
}
36-
};
30+
let (n, attr) = { self.parent_rtcp_reader.read(buf, a).await? };
3731

3832
let mut b = &buf[..n];
3933
let pkts = rtcp::packet::unmarshal(&mut b)?;
4034

41-
let now = if let Some(f) = &self.now {
35+
let now = if let Some(f) = &self.internal.now {
4236
f().await
4337
} else {
4438
SystemTime::now()
@@ -50,7 +44,7 @@ impl RTCPReader for ReceiverReportInternal {
5044
.downcast_ref::<rtcp::sender_report::SenderReport>()
5145
{
5246
let stream = {
53-
let m = self.streams.lock().await;
47+
let m = self.internal.streams.lock().await;
5448
m.get(&sr.ssrc).cloned()
5549
};
5650
if let Some(stream) = stream {
@@ -136,12 +130,10 @@ impl Interceptor for ReceiverReport {
136130
&self,
137131
reader: Arc<dyn RTCPReader + Send + Sync>,
138132
) -> Arc<dyn RTCPReader + Send + Sync> {
139-
{
140-
let mut parent_rtcp_reader = self.internal.parent_rtcp_reader.lock().await;
141-
*parent_rtcp_reader = Some(reader);
142-
}
143-
144-
Arc::clone(&self.internal) as Arc<dyn RTCPReader + Send + Sync>
133+
Arc::new(ReceiverReportRtcpReader {
134+
internal: Arc::clone(&self.internal),
135+
parent_rtcp_reader: reader,
136+
})
145137
}
146138

147139
/// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method

0 commit comments

Comments
 (0)