Skip to content

Commit

Permalink
Event listener crash fix
Browse files Browse the repository at this point in the history
  • Loading branch information
t348575 committed Apr 23, 2024
1 parent b192c9b commit 86c8767
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 49 deletions.
4 changes: 2 additions & 2 deletions src/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ pub async fn run(
get_spade_using.clone_from(&channels[idx].1.channel_name);
}

if channels_status[&user_id] != info.broadcast_id.is_some() {
*channels_status.get_mut(&user_id).unwrap() = info.broadcast_id.is_some();
if channels_status[&user_id] != info.live {
*channels_status.get_mut(&user_id).unwrap() = info.live;
events_tx
.send_async(Events::Live {
channel_id: user_id.clone(),
Expand Down
107 changes: 60 additions & 47 deletions src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,58 +512,71 @@ async fn event_listener(
events_rx: Receiver<Events>,
tx: Sender<String>,
access_token: String,
) -> Result<()> {
while let Ok(events) = events_rx.recv_async().await {
let mut writer = pubsub.write().await;
match events {
Events::Live {
channel_id,
broadcast_id,
} => {
if let Some(s) = writer.streamers.get_mut(&channel_id) {
info!(
"Live status of {} is {}",
s.info.channel_name,
broadcast_id.is_some()
);
s.info.live = broadcast_id.is_some();
s.info.broadcast_id = broadcast_id;

let channel_id = channel_id.as_str().parse()?;
let nonce = Alphanumeric.sample_string(&mut rand::thread_rng(), 30);
let topics = [
Topics::PredictionsChannelV1(PredictionsChannelV1 { channel_id }),
Topics::CommunityPointsUserV1(CommunityPointsUserV1 { channel_id }),
Topics::Raid(Raid { channel_id }),
];

let cmds = if s.info.live {
topics
.into_iter()
.map(|x| {
listen_command(&[x], access_token.as_str(), nonce.as_str())
.context("Generate listen command")
})
.collect::<Result<Vec<_>, _>>()
} else {
topics
.into_iter()
.map(|x| {
unlisten_command(&[x], nonce.as_str())
.context("Generate unlisten command")
})
.collect::<Result<Vec<_>, _>>()
}?;

for item in cmds {
tx.send_async(item).await?;
) {
async fn inner(
pubsub: &Arc<RwLock<PubSub>>,
events_rx: &Receiver<Events>,
tx: &Sender<String>,
access_token: &str,
) -> Result<()> {
while let Ok(events) = events_rx.recv_async().await {
let mut writer = pubsub.write().await;
match events {
Events::Live {
channel_id,
broadcast_id,
} => {
if let Some(s) = writer.streamers.get_mut(&channel_id) {
info!(
"Live status of {} is {}",
s.info.channel_name,
broadcast_id.is_some()
);
s.info.live = broadcast_id.is_some();
s.info.broadcast_id = broadcast_id;

let channel_id = channel_id.as_str().parse()?;
let nonce = Alphanumeric.sample_string(&mut rand::thread_rng(), 30);
let topics = [
Topics::PredictionsChannelV1(PredictionsChannelV1 { channel_id }),
Topics::CommunityPointsUserV1(CommunityPointsUserV1 { channel_id }),
Topics::Raid(Raid { channel_id }),
];

let cmds = if s.info.live {
topics
.into_iter()
.map(|x| {
listen_command(&[x], access_token, nonce.as_str())
.context("Generate listen command")
})
.collect::<Result<Vec<_>, _>>()
} else {
topics
.into_iter()
.map(|x| {
unlisten_command(&[x], nonce.as_str())
.context("Generate unlisten command")
})
.collect::<Result<Vec<_>, _>>()
}?;

for item in cmds {
tx.send_async(item).await?;
}
}
}
Events::SpadeUpdate(s) => writer.spade_url = Some(s),
}
Events::SpadeUpdate(s) => writer.spade_url = Some(s),
}
Ok(())
}

loop {
if let Err(err) = inner(&pubsub, &events_rx, &tx, &access_token).await {
error!("{err:#?}");
}
}
Ok(())
}

pub async fn prediction_logic(
Expand Down

0 comments on commit 86c8767

Please sign in to comment.