Skip to content

Commit d06c065

Browse files
committed
support streaming content from S3, start using it in the webserver
1 parent ff5ebf0 commit d06c065

File tree

8 files changed

+261
-68
lines changed

8 files changed

+261
-68
lines changed

Cargo.lock

Lines changed: 24 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,10 @@ derive_more = { version = "2.0.0", features = ["display"] }
6464

6565
# Async
6666
tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros"] }
67+
tokio-util = { version = "0.7.15", default-features = false, features = ["io"] }
6768
futures-util = "0.3.5"
6869
async-stream = "0.3.5"
70+
async-compression = { version = "0.4.25", features = ["tokio", "bzip2", "zstd", "gzip"] }
6971
aws-config = "1.0.0"
7072
aws-sdk-s3 = "1.3.0"
7173
aws-sdk-cloudfront = "1.3.0"

src/storage/database.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use super::{Blob, FileRange};
1+
use super::{Blob, FileRange, StreamingBlob};
22
use crate::{InstanceMetrics, db::Pool, error::Result};
33
use chrono::{DateTime, Utc};
44
use futures_util::stream::{Stream, TryStreamExt};
55
use sqlx::Acquire;
6-
use std::sync::Arc;
6+
use std::{io, sync::Arc};
77

88
pub(crate) struct DatabaseBackend {
99
pool: Pool,
@@ -58,15 +58,14 @@ impl DatabaseBackend {
5858
}
5959
}
6060

61-
pub(super) async fn get(
61+
pub(super) async fn get_stream(
6262
&self,
6363
path: &str,
64-
max_size: usize,
6564
range: Option<FileRange>,
66-
) -> Result<Blob> {
67-
// The maximum size for a BYTEA (the type used for `content`) is 1GB, so this cast is safe:
65+
) -> Result<StreamingBlob> {
66+
// The maximum size for a BYTEA (the type used for `content`) is 1GB.
6867
// https://www.postgresql.org/message-id/162867790712200946i7ba8eb92v908ac595c0c35aee%40mail.gmail.com
69-
let max_size = max_size.min(i32::MAX as usize) as i32;
68+
let max_size = 2_i32.pow(30);
7069

7170
struct Result {
7271
path: String,
@@ -126,14 +125,17 @@ impl DatabaseBackend {
126125
i.try_into()
127126
.expect("invalid compression algorithm stored in database")
128127
});
129-
Ok(Blob {
128+
let content = result.content.unwrap_or_default();
129+
let content_len = content.len();
130+
Ok(StreamingBlob {
130131
path: result.path,
131132
mime: result
132133
.mime
133134
.parse()
134135
.unwrap_or(mime::APPLICATION_OCTET_STREAM),
135136
date_updated: result.date_updated,
136-
content: result.content.unwrap_or_default(),
137+
content: Box::new(io::Cursor::new(content)),
138+
content_length: content_len,
137139
compression,
138140
})
139141
}

src/storage/mod.rs

Lines changed: 166 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,17 @@ use path_slash::PathExt;
2525
use std::{
2626
fmt,
2727
fs::{self, File},
28-
io::{self, BufReader},
28+
io::{self, BufReader, Write as _},
2929
num::ParseIntError,
3030
ops::RangeInclusive,
3131
path::{Path, PathBuf},
3232
sync::Arc,
3333
};
3434
use std::{iter, str::FromStr};
35-
use tokio::{io::AsyncWriteExt, runtime::Runtime};
35+
use tokio::{
36+
io::{AsyncRead, AsyncReadExt as _, AsyncWriteExt},
37+
runtime::Runtime,
38+
};
3639
use tracing::{error, info_span, instrument, trace};
3740
use walkdir::WalkDir;
3841

@@ -57,6 +60,80 @@ impl Blob {
5760
}
5861
}
5962

63+
pub(crate) struct StreamingBlob {
64+
pub(crate) path: String,
65+
pub(crate) mime: Mime,
66+
pub(crate) date_updated: DateTime<Utc>,
67+
pub(crate) compression: Option<CompressionAlgorithm>,
68+
pub(crate) content_length: usize,
69+
pub(crate) content: Box<dyn AsyncRead + Unpin + Send>,
70+
}
71+
72+
impl std::fmt::Debug for StreamingBlob {
73+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74+
f.debug_struct("StreamingBlob")
75+
.field("path", &self.path)
76+
.field("mime", &self.mime)
77+
.field("date_updated", &self.date_updated)
78+
.field("compression", &self.compression)
79+
.finish()
80+
}
81+
}
82+
83+
impl StreamingBlob {
84+
/// wrap the content stream in a streaming decompressor according to the
85+
/// algorithm found in `compression` attribute.
86+
pub(crate) fn decompress(mut self) -> Self {
87+
let Some(alg) = self.compression else {
88+
return self;
89+
};
90+
91+
match alg {
92+
CompressionAlgorithm::Zstd => {
93+
self.content = Box::new(async_compression::tokio::bufread::ZstdDecoder::new(
94+
tokio::io::BufReader::new(self.content),
95+
))
96+
}
97+
CompressionAlgorithm::Bzip2 => {
98+
self.content = Box::new(async_compression::tokio::bufread::BzDecoder::new(
99+
tokio::io::BufReader::new(self.content),
100+
))
101+
}
102+
CompressionAlgorithm::Gzip => {
103+
self.content = Box::new(async_compression::tokio::bufread::GzipDecoder::new(
104+
tokio::io::BufReader::new(self.content),
105+
))
106+
}
107+
};
108+
self.compression = None;
109+
self
110+
}
111+
112+
pub(crate) async fn materialize(mut self, max_size: usize) -> Result<Blob> {
113+
self = self.decompress();
114+
115+
let mut content = crate::utils::sized_buffer::SizedBuffer::new(max_size);
116+
content.reserve(self.content_length);
117+
118+
let mut buf = [0u8; 8 * 1024];
119+
loop {
120+
let n = self.content.read(&mut buf).await?;
121+
if n == 0 {
122+
break;
123+
}
124+
content.write_all(&buf[..n])?;
125+
}
126+
127+
Ok(Blob {
128+
path: self.path,
129+
mime: self.mime,
130+
date_updated: self.date_updated,
131+
content: content.into_inner(),
132+
compression: self.compression,
133+
})
134+
}
135+
}
136+
60137
pub fn get_file_list<P: AsRef<Path>>(path: P) -> Box<dyn Iterator<Item = Result<PathBuf>>> {
61138
let path = path.as_ref().to_path_buf();
62139
if path.is_file() {
@@ -210,6 +287,35 @@ impl AsyncStorage {
210287
})
211288
}
212289

290+
/// Fetch a rustdoc file from our blob storage.
291+
/// * `name` - the crate name
292+
/// * `version` - the crate version
293+
/// * `latest_build_id` - the id of the most recent build. used purely to invalidate the local archive
294+
/// index cache, when `archive_storage` is `true.` Without it we wouldn't know that we have
295+
/// to invalidate the locally cached file after a rebuild.
296+
/// * `path` - the wanted path inside the documentation.
297+
/// * `archive_storage` - if `true`, we will assume we have a remove ZIP archive and an index
298+
/// where we can fetch the requested path from inside the ZIP file.
299+
#[instrument]
300+
pub(crate) async fn stream_rustdoc_file(
301+
&self,
302+
name: &str,
303+
version: &str,
304+
latest_build_id: Option<BuildId>,
305+
path: &str,
306+
archive_storage: bool,
307+
) -> Result<StreamingBlob> {
308+
trace!("fetch rustdoc file");
309+
Ok(if archive_storage {
310+
self.stream_from_archive(&rustdoc_archive_path(name, version), latest_build_id, path)
311+
.await?
312+
} else {
313+
// Add rustdoc prefix, name and version to the path for accessing the file stored in the database
314+
let remote_path = format!("rustdoc/{name}/{version}/{path}");
315+
self.get_stream(&remote_path).await?
316+
})
317+
}
318+
213319
#[context("fetching {path} from {name} {version} (archive: {archive_storage})")]
214320
pub(crate) async fn fetch_source_file(
215321
&self,
@@ -282,15 +388,16 @@ impl AsyncStorage {
282388

283389
#[instrument]
284390
pub(crate) async fn get(&self, path: &str, max_size: usize) -> Result<Blob> {
285-
let mut blob = match &self.backend {
286-
StorageBackend::Database(db) => db.get(path, max_size, None).await,
287-
StorageBackend::S3(s3) => s3.get(path, max_size, None).await,
391+
self.get_stream(path).await?.materialize(max_size).await
392+
}
393+
394+
#[instrument]
395+
pub(crate) async fn get_stream(&self, path: &str) -> Result<StreamingBlob> {
396+
let blob = match &self.backend {
397+
StorageBackend::Database(db) => db.get_stream(path, None).await,
398+
StorageBackend::S3(s3) => s3.get_stream(path, None).await,
288399
}?;
289-
if let Some(alg) = blob.compression {
290-
blob.content = decompress(blob.content.as_slice(), alg, max_size)?;
291-
blob.compression = None;
292-
}
293-
Ok(blob)
400+
Ok(blob.decompress())
294401
}
295402

296403
#[instrument]
@@ -301,18 +408,28 @@ impl AsyncStorage {
301408
range: FileRange,
302409
compression: Option<CompressionAlgorithm>,
303410
) -> Result<Blob> {
411+
self.get_range_stream(path, range, compression)
412+
.await?
413+
.materialize(max_size)
414+
.await
415+
}
416+
417+
#[instrument]
418+
pub(super) async fn get_range_stream(
419+
&self,
420+
path: &str,
421+
range: FileRange,
422+
compression: Option<CompressionAlgorithm>,
423+
) -> Result<StreamingBlob> {
304424
let mut blob = match &self.backend {
305-
StorageBackend::Database(db) => db.get(path, max_size, Some(range)).await,
306-
StorageBackend::S3(s3) => s3.get(path, max_size, Some(range)).await,
425+
StorageBackend::Database(db) => db.get_stream(path, Some(range)).await,
426+
StorageBackend::S3(s3) => s3.get_stream(path, Some(range)).await,
307427
}?;
308428
// `compression` represents the compression of the file-stream inside the archive.
309429
// We don't compress the whole archive, so the encoding of the archive's blob is irrelevant
310430
// here.
311-
if let Some(alg) = compression {
312-
blob.content = decompress(blob.content.as_slice(), alg, max_size)?;
313-
blob.compression = None;
314-
}
315-
Ok(blob)
431+
blob.compression = compression;
432+
Ok(blob.decompress())
316433
}
317434

318435
#[instrument]
@@ -389,6 +506,38 @@ impl AsyncStorage {
389506
})
390507
}
391508

509+
#[instrument]
510+
pub(crate) async fn stream_from_archive(
511+
&self,
512+
archive_path: &str,
513+
latest_build_id: Option<BuildId>,
514+
path: &str,
515+
) -> Result<StreamingBlob> {
516+
let index_filename = self
517+
.download_archive_index(archive_path, latest_build_id)
518+
.await?;
519+
520+
let info = {
521+
let path = path.to_owned();
522+
spawn_blocking(move || archive_index::find_in_file(index_filename, &path)).await
523+
}?
524+
.ok_or(PathNotFoundError)?;
525+
526+
let blob = self
527+
.get_range_stream(archive_path, info.range(), Some(info.compression()))
528+
.await?;
529+
assert_eq!(blob.compression, None);
530+
531+
Ok(StreamingBlob {
532+
path: format!("{archive_path}/{path}"),
533+
mime: detect_mime(path),
534+
date_updated: blob.date_updated,
535+
content: blob.content,
536+
content_length: blob.content_length,
537+
compression: None,
538+
})
539+
}
540+
392541
#[instrument(skip(self))]
393542
pub(crate) async fn store_all_in_archive(
394543
&self,

0 commit comments

Comments
 (0)