Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
412 changes: 408 additions & 4 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ members = [
"atrium-crypto",
"atrium-xrpc",
"atrium-xrpc-client",
"atrium-streams",
"atrium-streams-client",
"bsky-cli",
"bsky-sdk",
]
Expand All @@ -26,6 +28,8 @@ keywords = ["atproto", "bluesky"]
atrium-api = { version = "0.24.3", path = "atrium-api" }
atrium-xrpc = { version = "0.11.3", path = "atrium-xrpc" }
atrium-xrpc-client = { version = "0.5.6", path = "atrium-xrpc-client" }
atrium-streams = { version = "0.1.0", path = "atrium-streams" }
atrium-streams-client = { version = "0.1.0", path = "atrium-streams-client" }
bsky-sdk = { version = "0.1.9", path = "bsky-sdk" }

# async in traits
Expand All @@ -35,6 +39,10 @@ async-trait = "0.1.80"
# DAG-CBOR codec
ipld-core = { version = "0.4.1", default-features = false, features = ["std"] }
serde_ipld_dagcbor = { version = "0.6.0", default-features = false, features = ["std"] }
cbor4ii = { version = "0.2.14", default-features = false, features = ["use_alloc"] }

# CAR files
rs-car = "0.4.1"

# Parsing and validation
chrono = "0.4"
Expand All @@ -55,8 +63,10 @@ rand = "0.8.5"

# Networking
futures = { version = "0.3.30", default-features = false, features = ["alloc"] }
async-stream = "0.3.5"
http = "1.1.0"
tokio = { version = "1.37", default-features = false }
tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] }

# HTTP client integrations
isahc = "1.7.2"
Expand All @@ -76,3 +86,6 @@ mockito = "1.4"
# WebAssembly
wasm-bindgen-test = "0.3.41"
bumpalo = "~3.14.0"

# Code generation
bon = "2.2.1"
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ Definitions for XRPC request/response, and their associated errors.

A library provides clients that implement the `XrpcClient` defined in [atrium-xrpc](./atrium-xrpc/)

### [`atrium-streams`](./atrium-streams/)

Definitions for traits, types and utilities for dealing with event stream subscriptions. (WIP)

### [`atrium-streams-client`](./atrium-streams-client/)

A library that provides default implementations of the `EventStreamClient`, `Handlers` and `Subscription` defined in [atrium-streams](./atrium-streams/) for interacting with the variety of subscriptions in ATProto (WIP)

### [`bsky-sdk`](./bsky-sdk/)

[![](https://img.shields.io/crates/v/bsky-sdk)](https://crates.io/crates/bsky-sdk)
Expand Down
2 changes: 2 additions & 0 deletions atrium-streams-client/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target
/Cargo.lock
5 changes: 5 additions & 0 deletions atrium-streams-client/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Changelog
All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
27 changes: 27 additions & 0 deletions atrium-streams-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "atrium-streams-client"
version = "0.1.0"
authors = ["Elaina <17bestradiol@proton.me>"]
edition.workspace = true
rust-version.workspace = true
description = "Event Streams Client library for AT Protocol (Bluesky)"
documentation = "https://docs.rs/atrium-streams-client"
readme = "README.md"
repository.workspace = true
license.workspace = true
keywords.workspace = true

[dependencies]
atrium-xrpc.workspace = true
atrium-streams.workspace = true
futures.workspace = true
ipld-core.workspace = true
async-stream.workspace = true
tokio-tungstenite.workspace = true
serde_ipld_dagcbor.workspace = true
rs-car.workspace = true
tokio.workspace = true
bon.workspace = true
serde_html_form.workspace = true
serde.workspace = true
thiserror.workspace = true
1 change: 1 addition & 0 deletions atrium-streams-client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# ATrium XRPC WSS Client
87 changes: 87 additions & 0 deletions atrium-streams-client/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
//! This file provides a client for the `ATProto` XRPC over WSS protocol.
//! It implements the [`EventStreamClient`] trait for the [`WssClient`] struct.

use std::str::FromStr;

use futures::Stream;
use tokio::net::TcpStream;

use atrium_xrpc::{
http::{Request, Uri},
types::Header,
};
use bon::Builder;
use serde::Serialize;
use tokio_tungstenite::{
connect_async,
tungstenite::{self, handshake::client::generate_key},
MaybeTlsStream, WebSocketStream,
};

use atrium_streams::client::{EventStreamClient, XrpcUri};

/// An enum of possible error kinds for this crate.
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Invalid uri")]
InvalidUri,
#[error("Parsing parameters failed: {0}")]
ParsingParameters(#[from] serde_html_form::ser::Error),
#[error("Connection error: {0}")]
Connection(#[from] tungstenite::Error),
}

#[derive(Builder)]
pub struct WssClient<'a, P: Serialize> {
xrpc_uri: XrpcUri<'a>,
params: Option<P>,
}

type StreamKind = WebSocketStream<MaybeTlsStream<TcpStream>>;
impl<P: Serialize + Send + Sync> EventStreamClient<<StreamKind as Stream>::Item, Error>
for WssClient<'_, P>
{
async fn connect(&self) -> Result<impl Stream<Item = <StreamKind as Stream>::Item>, Error> {
let Self { xrpc_uri, params } = self;
let mut uri = xrpc_uri.to_uri();
//// Query parameters
if let Some(p) = &params {
uri.push('?');
uri += &serde_html_form::to_string(p)?;
};
////

//// Request
// Extracting the authority from the URI to set the Host header.
let uri = Uri::from_str(&uri).map_err(|_| Error::InvalidUri)?;
let authority = uri.authority().ok_or_else(|| Error::InvalidUri)?.as_str();
let host = authority
.find('@')
.map_or_else(|| authority, |idx| authority.split_at(idx + 1).1);

// Building the request.
let mut request = Request::builder()
.uri(&uri)
.method("GET")
.header("Host", host)
.header("Connection", "Upgrade")
.header("Upgrade", "websocket")
.header("Sec-WebSocket-Version", "13")
.header("Sec-WebSocket-Key", generate_key());

// Adding the ATProto headers.
if let Some(proxy) = self.atproto_proxy_header().await {
request = request.header(Header::AtprotoProxy, proxy);
}
if let Some(accept_labelers) = self.atproto_accept_labelers_header().await {
request = request.header(Header::AtprotoAcceptLabelers, accept_labelers.join(", "));
}

// In our case, the only thing that could possibly fail is the URI. The headers are all `String`/`&str`.
let request = request.body(()).map_err(|_| Error::InvalidUri)?;
////

let (stream, _) = connect_async(request).await?;
Ok(stream)
}
}
6 changes: 6 additions & 0 deletions atrium-streams-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mod client;
pub use client::{Error, WssClient};

pub mod subscriptions;

pub use atrium_streams; // Re-export the atrium_streams crate
1 change: 1 addition & 0 deletions atrium-streams-client/src/subscriptions/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod repositories;
Loading