Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: Live streaming with media over quic #27

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
RUSTC_WRAPPER: "sccache"
steps:
- name: Checkout
uses: actions/checkout@master
uses: actions/checkout@v4
with:
submodules: recursive

Expand All @@ -55,7 +55,7 @@ jobs:
HOME: /root #added based on https://github.com/actions/setup-go/issues/116

- name: Run sccache-cache
uses: mozilla-actions/sccache-action@v0.0.3
uses: mozilla-actions/sccache-action@v0.0.4

- name: check
run: |
Expand Down Expand Up @@ -100,4 +100,4 @@ jobs:
done
env:
XDG_CACHE_HOME: /root/.cache
HOME: /root #added based on https://github.com/actions/setup-go/issues/116
HOME: /root #added based on https://github.com/actions/setup-go/issues/116
20 changes: 20 additions & 0 deletions iroh-gateway/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions iroh-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ tokio-rustls-acme = { version = "0.2.0", features = ["axum"] }
hyper-util = "0.1.2"
rustls-pemfile = "1.0.2"
tower-service = "0.3.2"
mime_guess = "2.0.4"
86 changes: 52 additions & 34 deletions iroh-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ struct Inner {
#[debug("MimeClassifier")]
mime_classifier: MimeClassifier,
/// Cache of hashes to mime types
mime_cache: Mutex<LruCache<Hash, (u64, Mime)>>,
mime_cache: Mutex<LruCache<(Hash, Option<String>), (u64, Mime)>>,
/// Cache of hashes to collections
collection_cache: Mutex<LruCache<Hash, Collection>>,
}
Expand Down Expand Up @@ -175,25 +175,19 @@ async fn get_collection(
return Ok(res.clone());
}
let (collection, headers) = get_collection_inner(hash, connection, true).await?;
let mimes = headers
.into_iter()
.map(|(hash, size, header)| {
let mime = gateway.mime_classifier.classify(
mime_classifier::LoadContext::Browsing,
mime_classifier::NoSniffFlag::Off,
mime_classifier::ApacheBugFlag::On,
&None,
&header,
);
(hash, size, mime)
})
.collect::<Vec<_>>();
{
let mut cache = gateway.mime_cache.lock().unwrap();
for (hash, size, mime) in mimes {
cache.put(hash, (size, mime));
}

let mut cache = gateway.mime_cache.lock().unwrap();
for (name, hash) in collection.iter() {
let ext = get_extension(name);
let Some((hash, size, data)) = headers.iter().find(|(h, _, _)| h == hash) else {
tracing::debug!("hash {hash:?} for name {name:?} not found in headers");
continue;
};
let mime = get_mime_from_ext_and_data(ext.as_deref(), &data, &gateway.mime_classifier);
let key = (*hash, ext);
cache.put(key, (*size, mime));
}
drop(cache);

gateway
.collection_cache
Expand All @@ -203,9 +197,16 @@ async fn get_collection(
Ok(collection)
}

fn get_extension(name: &str) -> Option<String> {
std::path::Path::new(name)
.extension()
.map(|s| s.to_string_lossy().to_string())
}

/// Get the mime type for a hash from the remote node.
async fn get_mime_type_inner(
hash: &Hash,
ext: Option<&str>,
connection: &quinn::Connection,
mime_classifier: &MimeClassifier,
) -> anyhow::Result<(u64, Mime)> {
Expand All @@ -223,31 +224,46 @@ async fn get_mime_type_inner(
anyhow::bail!("unexpected response");
};
let _stats = at_closing.next().await?;
let mime = get_mime_from_ext_and_data(ext, &data, mime_classifier);
Ok((size, mime))
}

fn get_mime_from_ext_and_data(
ext: Option<&str>,
data: &[u8],
mime_classifier: &MimeClassifier,
) -> Mime {
let context = mime_classifier::LoadContext::Browsing;
let no_sniff_flag = mime_classifier::NoSniffFlag::Off;
let no_sniff_flag = mime_classifier::NoSniffFlag::On;
let apache_bug_flag = mime_classifier::ApacheBugFlag::On;
let supplied_type = None;
let mime = mime_classifier.classify(
let supplied_type = match ext {
None => None,
Some(ext) => mime_guess::from_ext(ext).first(),
};
mime_classifier.classify(
context,
no_sniff_flag,
apache_bug_flag,
&supplied_type,
&data,
);
Ok((size, mime))
data,
)
}

/// Get the mime type for a hash, either from the cache or by requesting it from the node.
async fn get_mime_type(
gateway: &Gateway,
hash: &Hash,
name: Option<&str>,
connection: &quinn::Connection,
) -> anyhow::Result<(u64, Mime)> {
if let Some(sm) = gateway.mime_cache.lock().unwrap().get(hash) {
let ext = name.map(|n| get_extension(n)).flatten();
let key = (*hash, ext.clone());
if let Some(sm) = gateway.mime_cache.lock().unwrap().get(&key) {
return Ok(sm.clone());
}
let sm = get_mime_type_inner(hash, connection, &gateway.mime_classifier).await?;
gateway.mime_cache.lock().unwrap().put(*hash, sm.clone());
let sm =
get_mime_type_inner(hash, ext.as_deref(), connection, &gateway.mime_classifier).await?;
gateway.mime_cache.lock().unwrap().put(key, sm.clone());
Ok(sm)
}

Expand All @@ -259,7 +275,7 @@ async fn handle_local_blob_request(
) -> std::result::Result<Response<Body>, AppError> {
let connection = gateway.get_default_connection().await?;
let byte_range = parse_byte_range(req).await?;
let res = forward_range(&gateway, connection, &blake3_hash, byte_range).await?;
let res = forward_range(&gateway, connection, &blake3_hash, None, byte_range).await?;
Ok(res)
}

Expand Down Expand Up @@ -299,7 +315,7 @@ async fn handle_ticket_index(
let hash = ticket.hash();
let prefix = format!("/ticket/{}", ticket);
let res = match ticket.format() {
BlobFormat::Raw => forward_range(&gateway, connection, &hash, byte_range)
BlobFormat::Raw => forward_range(&gateway, connection, &hash, None, byte_range)
.await?
.into_response(),
BlobFormat::HashSeq => collection_index(&gateway, connection, &hash, &prefix)
Expand Down Expand Up @@ -345,7 +361,8 @@ async fn collection_index(
for (name, child_hash) in collection.iter() {
let url = format!("{}/{}", link_prefix, name);
let url = encode_relative_url(&url)?;
let smo = gateway.mime_cache.lock().unwrap().get(child_hash).cloned();
let key = (*child_hash, get_extension(name));
let smo = gateway.mime_cache.lock().unwrap().get(&key).cloned();
res.push_str(&format!("<a href=\"{}\">{}</a>", url, name,));
if let Some((size, mime)) = smo {
res.push_str(&format!(" ({}, {})", mime, indicatif::HumanBytes(size)));
Expand Down Expand Up @@ -373,7 +390,7 @@ async fn forward_collection_range(
let collection = get_collection(gateway, hash, &connection).await?;
for (name, hash) in collection.iter() {
if name == suffix {
let res = forward_range(gateway, connection, hash, range).await?;
let res = forward_range(gateway, connection, hash, Some(suffix), range).await?;
return Ok(res.into_response());
} else {
tracing::trace!("'{}' != '{}'", name, suffix);
Expand All @@ -400,16 +417,17 @@ async fn forward_range(
gateway: &Gateway,
connection: quinn::Connection,
hash: &Hash,
name: Option<&str>,
(start, end): (Option<u64>, Option<u64>),
) -> anyhow::Result<Response<Body>> {
// we need both byte ranges and chunk ranges.
// chunk ranges to request data, and byte ranges to return the data.
tracing::debug!("forward_range {:?} {:?}", start, end);
tracing::debug!("forward_range {:?} {:?} (name {name:?})", start, end);

let byte_ranges = to_byte_range(start, end);
let chunk_ranges = to_chunk_range(start, end);
tracing::debug!("got connection");
let (_size, mime) = get_mime_type(gateway, hash, &connection).await?;
let (_size, mime) = get_mime_type(gateway, hash, name, &connection).await?;
tracing::debug!("mime: {}", mime);
let chunk_ranges = RangeSpecSeq::from_ranges(vec![chunk_ranges]);
let request = iroh::bytes::protocol::GetRequest::new(*hash, chunk_ranges.clone());
Expand Down
Loading
Loading