Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic websocket support #753

Merged
merged 1 commit into from
May 26, 2021
Merged

Conversation

Johannesd3
Copy link
Contributor

(Copied from #734 (reply in thread))

So, here's my first attempt of websocket support: https://github.com/Johannesd3/librespot/tree/new-api/core/src/dealer

  • It uses tungstenite and tokio-tungstenite as websocket library.
  • There's a Builder struct to register handlers before a connection is established.
  • After the Builder is created, there are two ways to launch the Dealer:
  • Reconnecting: If the connection is closed or the server doesn't react on pings, there's an attempt to reconnect every 10 seconds (but all replies sent in between are lost). It calls a closure to get a new token, in case it expires.
  • Message handling: It's possible to subscribe to any uri an arbitrary number of times. Every message sent to a "sub-uri" will be received.
  • Request handling: One can add exactly one RequestHandler per uri. Currently, these handlers indicate success by returning a bool. But that's easy to change.
  • The handlers are stored in custom tree structs, hierarchally by uri components. Maybe it's too complicated - or maybe such a tree would also be useful for mercury.
  • Messages and requests are parsed with serde/serde_json.
  • Request payloads are gzip-decoded if necessary (using flate2). I was not able to test whether it works, I didn't receive a gzipped request (I didn't receive a request at all).

@Johannesd3
Copy link
Contributor Author

Build fails because of native-tls. I should try rustls.

core/src/session.rs Outdated Show resolved Hide resolved
Comment on lines +46 to +54
let mut aps = data.ap_list.into_iter().filter_map(|ap| {
let mut split = ap.rsplitn(2, ':');
let port = split
.next()
.expect("rsplitn should not return empty iterator");
let host = split.next()?.to_owned();
let port: u16 = port.parse().ok()?;
Some((host, port))
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@roderickvd Note that I split up host:port already in apresolve. It just seemed easier for the moment, it's not meant to be final anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a good idea.

Comment on lines +5 to +9
pub enum HandlerMap<T> {
Leaf(T),
Branch(HashMap<String, HandlerMap<T>>),
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use DashMap to avoid a mutex around this struct. There would be a lot of places where librespot could benefit of dashmap, so I leave it for a future PR.

core/src/dealer/mod.rs Outdated Show resolved Hide resolved
Comment on lines +170 to +265
pub fn launch_in_background<Fut, F>(self, get_url: F, proxy: Option<Url>) -> Dealer
where
Fut: Future<Output = Url> + Send + 'static,
F: (FnMut() -> Fut) + Send + 'static,
{
create_dealer!(self, shared -> run(shared, None, get_url, proxy))
}

pub async fn launch<Fut, F>(self, mut get_url: F, proxy: Option<Url>) -> WsResult<Dealer>
where
Fut: Future<Output = Url> + Send + 'static,
F: (FnMut() -> Fut) + Send + 'static,
{
let dealer = create_dealer!(self, shared -> {
// Try to connect.
let url = get_url().await;
let tasks = connect(&url, proxy.as_ref(), &shared).await?;

// If a connection is established, continue in a background task.
run(shared, Some(tasks), get_url, proxy)
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't use macros very often, but I like this solution. Note that one of the functions is async and returns a result, but the other doesn't.

core/src/dealer/protocol.rs Outdated Show resolved Hide resolved
@@ -117,7 +105,8 @@ impl Session {
audio_key: OnceCell::new(),
channel: OnceCell::new(),
mercury: OnceCell::new(),
handle,
dealer: tokio::sync::OnceCell::new(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it makes sense to store the dealer here.

(BTW tokio's async OnceCell was added after I created a feature request, so I had to use it)

Comment on lines 161 to 182
let session = if let Some(session) = session {
session
} else {
// Just don't finish and hope that someone will come along and clean up the Dealer.
futures_util::future::pending::<Infallible>().await;
unreachable!()
};

// TODO: How to handle errors here?

let token = keymaster::get_token(
&session,
"65b708073fc0480ea92a077233ca87bd",
"playlist-read",
)
.await
.unwrap();

let mut url: Url = "wss://dealer.spotify.com:443/".parse().unwrap();
url.query_pairs_mut()
.append_pair("access_token", &token.access_token);
url
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should happen if we aren't able to retrieve a token?

It was done on purpose that this closure can't return an error. What should the dealer do other than calling the closure again and again and again?

So it's probably better to periodically try it again inside of the closure. Even if the closure/future does never return: It won't block the executor, and the dealer task can't do anything without a uri.

If possible, we should probably cache tokens and reuse them if they aren't expired.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends on the reason for not getting a token I guess. When it's a connection error, you'd want to retry periodically and ideally with an increasing back-off timer to not cause a DoS. In case there's a connection, but an invalid response, the same and try some other servers.

For headless machines I'd prefer to just try indefinitely rather than bailing out after some time.

core/Cargo.toml Outdated Show resolved Hide resolved
core/Cargo.toml Outdated
tokio-stream = "0.1.1"
tokio-tungstenite = { version = "0.14", default-features = false, features = ["native-tls"] }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid there's no way to add websocket support without new dependencies.

native-tls failed in CI. I hesitated to use rustls because it brings up a new question: rustls uses the ring crate for cryptography. We could use the ring crate too for our crypto stuff to avoid duplication, but we won't be able to switch completely, e.g. because it doesn't support aes192.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you think ring is the better route to take, maybe we can port aes192 to ring, first as a crate extension in librespot, then submit it as a PR upstream? In the meantime fine to rely on both as long as there's a defined vision for the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately not: briansmith/ring#112

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a pity. The statement "Because nobody is using it in the real world." is debatable 😉

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking, "what does hyper-tls use"? And I see it's nativetls...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you like rustls better or is it to workaround failing CI or portability? I wouldn't mind binding to native libraries for cryptography because we can rely on them to be well-tested, performant and maintained.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rustls wouldn't be a bad choice: https://www.abetterinternet.org/post/preparing-rustls-for-wider-adoption/

I don't really care.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All valid arguments. Guess you can argue either way. Not partial to either here too, just thinking along how to make this as lean and maintainable as possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I learned that tonic(grpc framework based on hyper) supports only rustls.

Thankfully, it's very easy to switch.

.await
.unwrap();

let mut url: Url = "wss://dealer.spotify.com:443/".parse().unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be getting this from the new apresolve.

@roderickvd
Copy link
Member

So, here's my first attempt of websocket support: https://github.com/Johannesd3/librespot/tree/new-api/core/src/dealer

Great work man. Again I suggest that we generally just work in a new-api branch, coordinate our efforts on chat, and add comments in the various commits. When we toy around some big ideas or alternatives we can set up PR's. But for this I see this as necessary and awesome groundwork so no need to wait on a PR workflow.

  • After the Builder is created, there are two ways to launch the Dealer:

I think this is great to cater both to headless machines and the likes of ncspot.

  • Reconnecting: If the connection is closed or the server doesn't react on pings, there's an attempt to reconnect every 10 seconds (but all replies sent in between are lost). It calls a closure to get a new token, in case it expires.

Doesn't seem problematic that everything is lost in the meantime right? No need to buffer.

@Johannesd3
Copy link
Contributor Author

Hopefully this dealer stuff will never be exposed as public api. I tried to write it as self-contained as possible and give it a flexible api, but only because its not clear how it will be used later. Probably one of these alternatives will prove as more useful.

About reconnecting: I'm not familiar with these protocols, so I don't what could happen if something gets lost. In the websocket case it won't be tragic, the only messages we send are success true/false. But if we want to use a similar reconnection logic for mercury, we have to take care not to swallow any requests silently.

Copy link
Member

@roderickvd roderickvd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not worth the trouble?

Edit: I'm fighting GitHub reviews sometimes. I meant to ask in reference to removing gzip support in 7d2050c.

@Johannesd3
Copy link
Contributor Author

I think I will merge it into the new-api branch soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants