Skip to content

Commit

Permalink
generate notify info
Browse files Browse the repository at this point in the history
  • Loading branch information
harlanc committed Mar 19, 2023
1 parent 53b1623 commit 976f65a
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 91 deletions.
14 changes: 11 additions & 3 deletions protocol/hls/src/flv_data_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
ChannelData, ChannelDataConsumer, ChannelEvent, ChannelEventProducer,
},
rtmp::session::{
common::SubscriberInfo,
common::{NotifyInfo, SubscriberInfo},
define::SubscribeType,
errors::{SessionError, SessionErrorValue},
},
Expand Down Expand Up @@ -104,10 +104,14 @@ impl FlvDataReceiver {

loop {
let (sender, receiver) = oneshot::channel();

/*the sub info is only used to transfer from RTMP to HLS, but not for client player */
let sub_info = SubscriberInfo {
id: self.subscriber_id,
sub_type: SubscribeType::PlayerHls,
sub_type: SubscribeType::GenerateHls,
notify_info: NotifyInfo {
request_url: String::from(""),
remote_addr: String::from(""),
},
};

let subscribe_event = ChannelEvent::Subscribe {
Expand Down Expand Up @@ -155,6 +159,10 @@ impl FlvDataReceiver {
let sub_info = SubscriberInfo {
id: self.subscriber_id,
sub_type: SubscribeType::PlayerHls,
notify_info: NotifyInfo {
request_url: String::from(""),
remote_addr: String::from(""),
},
};

let subscribe_event = ChannelEvent::UnSubscribe {
Expand Down
18 changes: 16 additions & 2 deletions protocol/httpflv/src/httpflv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ use {
cache::metadata::MetaData,
channels::define::{ChannelData, ChannelDataConsumer, ChannelEvent, ChannelEventProducer},
session::{
common::SubscriberInfo,
common::{NotifyInfo, SubscriberInfo},
define::SubscribeType,
errors::{SessionError, SessionErrorValue},
},
},
bytes::BytesMut,
std::time::Duration,
std::{net::SocketAddr, time::Duration},
tokio::{
sync::{mpsc, oneshot},
time::sleep,
Expand All @@ -32,6 +32,8 @@ pub struct HttpFlv {
data_consumer: ChannelDataConsumer,
http_response_data_producer: HttpResponseDataProducer,
subscriber_id: Uuid,
request_url: String,
remote_addr: SocketAddr,
}

impl HttpFlv {
Expand All @@ -40,6 +42,8 @@ impl HttpFlv {
stream_name: String,
event_producer: ChannelEventProducer,
http_response_data_producer: HttpResponseDataProducer,
request_url: String,
remote_addr: SocketAddr,
) -> Self {
let (_, data_consumer) = mpsc::unbounded_channel();
let subscriber_id = Uuid::new_v4();
Expand All @@ -52,6 +56,8 @@ impl HttpFlv {
event_producer,
http_response_data_producer,
subscriber_id,
request_url,
remote_addr,
}
}

Expand Down Expand Up @@ -140,6 +146,10 @@ impl HttpFlv {
let sub_info = SubscriberInfo {
id: self.subscriber_id,
sub_type: SubscribeType::PlayerHttpFlv,
notify_info: NotifyInfo {
request_url: self.request_url.clone(),
remote_addr: self.remote_addr.to_string(),
},
};

let subscribe_event = ChannelEvent::UnSubscribe {
Expand All @@ -163,6 +173,10 @@ impl HttpFlv {
let sub_info = SubscriberInfo {
id: self.subscriber_id,
sub_type: SubscribeType::PlayerHttpFlv,
notify_info: NotifyInfo {
request_url: self.request_url.clone(),
remote_addr: self.remote_addr.to_string(),
},
};

let subscribe_event = ChannelEvent::Subscribe {
Expand Down
12 changes: 9 additions & 3 deletions protocol/httpflv/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use {
super::httpflv::HttpFlv,
futures::channel::mpsc::unbounded,
hyper::{
server::conn::AddrStream,
service::{make_service_fn, service_fn},
Body, Request, Response, Server, StatusCode,
},
rtmp::channels::define::ChannelEventProducer,
std::net::SocketAddr,
};

type GenericError = Box<dyn std::error::Error + Send + Sync>;
Expand All @@ -15,6 +17,7 @@ static NOTFOUND: &[u8] = b"Not Found";
async fn handle_connection(
req: Request<Body>,
event_producer: ChannelEventProducer, // event_producer: ChannelEventProducer
remote_addr: SocketAddr,
) -> Result<Response<Body>> {
let path = req.uri().path();

Expand All @@ -33,6 +36,8 @@ async fn handle_connection(
stream_name,
event_producer,
http_response_data_producer,
String::from(path),
remote_addr,
);

tokio::spawn(async move {
Expand All @@ -59,11 +64,12 @@ pub async fn run(event_producer: ChannelEventProducer, port: usize) -> Result<()
let listen_address = format!("0.0.0.0:{port}");
let sock_addr = listen_address.parse().unwrap();

let new_service = make_service_fn(move |_| {
let new_service = make_service_fn(move |socket: &AddrStream| {
let remote_addr = socket.remote_addr();
let flv_copy = event_producer.clone();
async {
async move {
Ok::<_, GenericError>(service_fn(move |req| {
handle_connection(req, flv_copy.clone())
handle_connection(req, flv_copy.clone(), remote_addr)
}))
}
});
Expand Down
65 changes: 8 additions & 57 deletions protocol/rtmp/src/channels/define.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use {
crate::session::common::SubscriberInfo,
crate::session::common::{PublisherInfo, SubscriberInfo},
crate::statistics::StreamStatistics,
bytes::BytesMut,
serde::Serialize,
std::fmt,
tokio::sync::{broadcast, mpsc, oneshot},
};
Expand Down Expand Up @@ -31,12 +32,13 @@ pub type StreamStatisticSizeSender = oneshot::Sender<usize>;
pub type StreamStatisticSizeReceiver = oneshot::Sender<usize>;

type ChannelResponder<T> = oneshot::Sender<T>;
#[derive(Debug)]
#[derive(Debug, Serialize)]
pub enum ChannelEvent {
Subscribe {
app_name: String,
stream_name: String,
info: SubscriberInfo,
#[serde(skip_serializing)]
responder: ChannelResponder<ChannelDataConsumer>,
},
UnSubscribe {
Expand All @@ -47,73 +49,22 @@ pub enum ChannelEvent {
Publish {
app_name: String,
stream_name: String,
info: PublisherInfo,
#[serde(skip_serializing)]
responder: ChannelResponder<ChannelDataProducer>,
},
UnPublish {
app_name: String,
stream_name: String,
info: PublisherInfo,
},
#[serde(skip_serializing)]
Api {
data_sender: AvStatisticSender,
size_sender: StreamStatisticSizeSender,
},
}

impl fmt::Display for ChannelEvent {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
ChannelEvent::Subscribe {
app_name,
stream_name,
info,
responder: _,
} => {
write!(
f,
"receive event, event_name: Subscribe, app_name: {},stream_name: {}, subscriber id: {}",
app_name, stream_name, info.id,
)
}
ChannelEvent::UnSubscribe {
app_name,
stream_name,
info,
} => {
write!(
f,
"receive event, event_name: UnSubscribe, app_name: {},stream_name: {}, subscriber id: {}",
app_name, stream_name, info.id,
)
}
ChannelEvent::Publish {
app_name,
stream_name,
responder: _,
} => {
write!(
f,
"receive event, event_name: Publish, app_name: {app_name},stream_name: {stream_name}",
)
}
ChannelEvent::UnPublish {
app_name,
stream_name,
} => {
write!(
f,
"receive event, event_name: UnPublish, app_name: {app_name},stream_name: {stream_name}",
)
}
ChannelEvent::Api {
data_sender: _,
size_sender: _,
} => {
write!(f, "receive event, event_name: Api",)
}
}
}
}

#[derive(Debug)]
pub enum TransmitterEvent {
Subscribe {
Expand Down
10 changes: 8 additions & 2 deletions protocol/rtmp/src/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ impl Transmitter {
match info.sub_type {
SubscribeType::PlayerRtmp
| SubscribeType::PlayerHttpFlv
| SubscribeType::PlayerHls => {
| SubscribeType::PlayerHls
| SubscribeType::GenerateHls => {
if let Some(meta_body_data) = self.cache.get_metadata() {
producer.send(meta_body_data).map_err(|_| ChannelError {
value: ChannelErrorValue::SendError,
Expand Down Expand Up @@ -245,12 +246,16 @@ impl ChannelsManager {

pub async fn event_loop(&mut self) {
while let Some(message) = self.channel_event_consumer.recv().await {
log::info!("{}", message);
if let Ok(data) = serde_json::to_string(&message) {
log::info!("event data: {}", data);
}

match message {
ChannelEvent::Publish {
app_name,
stream_name,
responder,
info,
} => {
let rv = self.publish(&app_name, &stream_name);
match rv {
Expand All @@ -269,6 +274,7 @@ impl ChannelsManager {
ChannelEvent::UnPublish {
app_name,
stream_name,
info,
} => {
if let Err(err) = self.unpublish(&app_name, &stream_name) {
log::error!(
Expand Down
27 changes: 21 additions & 6 deletions protocol/rtmp/src/session/client_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,33 +68,44 @@ pub struct ClientSession {
/* Used to mark the subscriber's the data producer
in channels and delete it from map when unsubscribe
is called. */
subscriber_id: Uuid,
session_id: Uuid,
state: ClientSessionState,
client_type: ClientType,
}

impl ClientSession {
#[allow(dead_code)]
pub fn new(
stream: TcpStream,
client_type: ClientType,
app_name: String,
stream_name: String,
event_producer: ChannelEventProducer,
) -> Self {
let remote_addr = if let Ok(addr) = stream.peer_addr() {
log::info!("server session: {}", addr.to_string());
Some(addr)
} else {
None
};

let net_io = Arc::new(Mutex::new(BytesIO::new(stream)));
let subscriber_id = Uuid::new_v4();

Self {
io: Arc::clone(&net_io),
common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Client),
common: Common::new(
Arc::clone(&net_io),
event_producer,
SessionType::Client,
remote_addr,
),
handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)),
unpacketizer: ChunkUnpacketizer::new(),
app_name,
stream_name,
client_type,
state: ClientSessionState::Handshake,
subscriber_id,
session_id: subscriber_id,
}
}

Expand Down Expand Up @@ -454,15 +465,19 @@ impl ClientSession {
.subscribe_from_channels(
self.app_name.clone(),
self.stream_name.clone(),
self.subscriber_id,
self.session_id,
)
.await?;
}
"NetStream.Publish.Reset" => {}

"NetStream.Play.Start" => {
self.common
.publish_to_channels(self.app_name.clone(), self.stream_name.clone())
.publish_to_channels(
self.app_name.clone(),
self.stream_name.clone(),
self.session_id,
)
.await?
}
_ => {}
Expand Down
Loading

0 comments on commit 976f65a

Please sign in to comment.