Skip to content

Commit 2c33fc4

Browse files
Merge pull request #92 from theseus-rs/add-extractors
feat!: add configurable extractors
2 parents 3d74b9a + f08b558 commit 2c33fc4

File tree

16 files changed

+334
-189
lines changed

16 files changed

+334
-189
lines changed

examples/archive_async/src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ use postgresql_archive::{
77

88
#[tokio::main]
99
async fn main() -> Result<()> {
10+
let url = THESEUS_POSTGRESQL_BINARIES_URL;
1011
let version_req = VersionReq::STAR;
11-
let (archive_version, archive) =
12-
get_archive(THESEUS_POSTGRESQL_BINARIES_URL, &version_req).await?;
12+
let (archive_version, archive) = get_archive(url, &version_req).await?;
1313
let out_dir = tempfile::tempdir()?.into_path();
14-
extract(&archive, &out_dir).await?;
14+
extract(url, &archive, &out_dir).await?;
1515
println!(
1616
"PostgreSQL {} extracted to {}",
1717
archive_version,

examples/archive_sync/src/main.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ use postgresql_archive::blocking::{extract, get_archive};
55
use postgresql_archive::{Result, VersionReq, THESEUS_POSTGRESQL_BINARIES_URL};
66

77
fn main() -> Result<()> {
8+
let url = THESEUS_POSTGRESQL_BINARIES_URL;
89
let version_req = VersionReq::STAR;
9-
let (archive_version, archive) = get_archive(THESEUS_POSTGRESQL_BINARIES_URL, &version_req)?;
10+
let (archive_version, archive) = get_archive(url, &version_req)?;
1011
let out_dir = tempfile::tempdir()?.into_path();
11-
extract(&archive, &out_dir)?;
12+
extract(url, &archive, &out_dir)?;
1213
println!(
1314
"PostgreSQL {} extracted to {}",
1415
archive_version,

postgresql_archive/benches/archive.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ fn bench_extract(criterion: &mut Criterion) -> Result<()> {
2424
fn extract_archive(archive: &Vec<u8>) -> Result<()> {
2525
let out_dir = tempfile::tempdir()?.path().to_path_buf();
2626
create_dir_all(&out_dir)?;
27-
extract(archive, &out_dir)?;
27+
extract(THESEUS_POSTGRESQL_BINARIES_URL, archive, &out_dir)?;
2828
remove_dir_all(&out_dir)?;
2929
Ok(())
3030
}

postgresql_archive/src/archive.rs

Lines changed: 6 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,11 @@
11
//! Manage PostgreSQL archives
22
#![allow(dead_code)]
33

4-
use crate::error::Error::Unexpected;
54
use crate::error::Result;
6-
use crate::repository;
7-
use flate2::bufread::GzDecoder;
8-
use human_bytes::human_bytes;
9-
use num_format::{Locale, ToFormattedString};
5+
use crate::{extractor, repository};
106
use semver::{Version, VersionReq};
11-
use std::fs::{create_dir_all, remove_dir_all, remove_file, rename, File};
12-
use std::io::{copy, BufReader, Cursor};
13-
use std::path::{Path, PathBuf};
14-
use std::thread::sleep;
15-
use std::time::Duration;
16-
use tar::Archive;
17-
use tracing::{debug, instrument, warn};
7+
use std::path::Path;
8+
use tracing::instrument;
189

1910
pub const THESEUS_POSTGRESQL_BINARIES_URL: &str =
2011
"https://github.com/theseus-rs/postgresql-binaries";
@@ -47,164 +38,15 @@ pub async fn get_archive(url: &str, version_req: &VersionReq) -> Result<(Version
4738
Ok((version, bytes))
4839
}
4940

50-
/// Acquires a lock file in the [out_dir](Path) to prevent multiple processes from extracting the
51-
/// archive at the same time.
52-
///
53-
/// # Errors
54-
/// * If the lock file cannot be acquired.
55-
#[instrument(level = "debug")]
56-
fn acquire_lock(out_dir: &Path) -> Result<PathBuf> {
57-
let lock_file = out_dir.join("postgresql-archive.lock");
58-
59-
if lock_file.is_file() {
60-
let metadata = lock_file.metadata()?;
61-
let created = metadata.created()?;
62-
63-
if created.elapsed()?.as_secs() > 300 {
64-
warn!(
65-
"Stale lock file detected; removing file to attempt process recovery: {}",
66-
lock_file.to_string_lossy()
67-
);
68-
remove_file(&lock_file)?;
69-
}
70-
}
71-
72-
debug!(
73-
"Attempting to acquire lock: {}",
74-
lock_file.to_string_lossy()
75-
);
76-
77-
for _ in 0..30 {
78-
let lock = std::fs::OpenOptions::new()
79-
.create(true)
80-
.truncate(true)
81-
.write(true)
82-
.open(&lock_file);
83-
84-
match lock {
85-
Ok(_) => {
86-
debug!("Lock acquired: {}", lock_file.to_string_lossy());
87-
return Ok(lock_file);
88-
}
89-
Err(error) => {
90-
warn!("unable to acquire lock: {error}");
91-
sleep(Duration::from_secs(1));
92-
}
93-
}
94-
}
95-
96-
Err(Unexpected("Failed to acquire lock".to_string()))
97-
}
98-
9941
/// Extracts the compressed tar `bytes` to the [out_dir](Path).
10042
///
10143
/// # Errors
10244
/// Returns an error if the extraction fails.
10345
#[allow(clippy::cast_precision_loss)]
10446
#[instrument(skip(bytes))]
105-
pub async fn extract(bytes: &Vec<u8>, out_dir: &Path) -> Result<()> {
106-
let input = BufReader::new(Cursor::new(bytes));
107-
let decoder = GzDecoder::new(input);
108-
let mut archive = Archive::new(decoder);
109-
let mut files = 0;
110-
let mut extracted_bytes = 0;
111-
112-
let parent_dir = if let Some(parent) = out_dir.parent() {
113-
parent
114-
} else {
115-
debug!("No parent directory for {}", out_dir.to_string_lossy());
116-
out_dir
117-
};
118-
119-
create_dir_all(parent_dir)?;
120-
121-
let lock_file = acquire_lock(parent_dir)?;
122-
// If the directory already exists, then the archive has already been
123-
// extracted by another process.
124-
if out_dir.exists() {
125-
debug!(
126-
"Directory already exists {}; skipping extraction: ",
127-
out_dir.to_string_lossy()
128-
);
129-
remove_file(&lock_file)?;
130-
return Ok(());
131-
}
132-
133-
let extract_dir = tempfile::tempdir_in(parent_dir)?.into_path();
134-
debug!("Extracting archive to {}", extract_dir.to_string_lossy());
135-
136-
for archive_entry in archive.entries()? {
137-
let mut entry = archive_entry?;
138-
let entry_header = entry.header();
139-
let entry_type = entry_header.entry_type();
140-
let entry_size = entry_header.size()?;
141-
#[cfg(unix)]
142-
let file_mode = entry_header.mode()?;
143-
144-
let entry_header_path = entry_header.path()?.to_path_buf();
145-
let prefix = match entry_header_path.components().next() {
146-
Some(component) => component.as_os_str().to_str().unwrap_or_default(),
147-
None => {
148-
return Err(Unexpected(
149-
"Failed to get file header path prefix".to_string(),
150-
));
151-
}
152-
};
153-
let stripped_entry_header_path = entry_header_path.strip_prefix(prefix)?.to_path_buf();
154-
let mut entry_name = extract_dir.clone();
155-
entry_name.push(stripped_entry_header_path);
156-
157-
if entry_type.is_dir() || entry_name.is_dir() {
158-
create_dir_all(&entry_name)?;
159-
} else if entry_type.is_file() {
160-
let mut output_file = File::create(&entry_name)?;
161-
copy(&mut entry, &mut output_file)?;
162-
163-
files += 1;
164-
extracted_bytes += entry_size;
165-
166-
#[cfg(unix)]
167-
{
168-
use std::os::unix::fs::PermissionsExt;
169-
output_file.set_permissions(std::fs::Permissions::from_mode(file_mode))?;
170-
}
171-
} else if entry_type.is_symlink() {
172-
#[cfg(unix)]
173-
if let Some(symlink_target) = entry.link_name()? {
174-
let symlink_path = entry_name;
175-
std::os::unix::fs::symlink(symlink_target.as_ref(), symlink_path)?;
176-
}
177-
}
178-
}
179-
180-
if out_dir.exists() {
181-
debug!(
182-
"Directory already exists {}; skipping rename and removing extraction directory: {}",
183-
out_dir.to_string_lossy(),
184-
extract_dir.to_string_lossy()
185-
);
186-
remove_dir_all(&extract_dir)?;
187-
} else {
188-
debug!(
189-
"Renaming {} to {}",
190-
extract_dir.to_string_lossy(),
191-
out_dir.to_string_lossy()
192-
);
193-
rename(extract_dir, out_dir)?;
194-
}
195-
196-
if lock_file.is_file() {
197-
debug!("Removing lock file: {}", lock_file.to_string_lossy());
198-
remove_file(lock_file)?;
199-
}
200-
201-
debug!(
202-
"Extracting {} files totalling {}",
203-
files.to_formatted_string(&Locale::en),
204-
human_bytes(extracted_bytes as f64)
205-
);
206-
207-
Ok(())
47+
pub async fn extract(url: &str, bytes: &Vec<u8>, out_dir: &Path) -> Result<()> {
48+
let extractor_fn = extractor::registry::get(url)?;
49+
extractor_fn(bytes, out_dir)
20850
}
20951

21052
#[cfg(test)]

postgresql_archive/src/blocking/archive.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ pub fn get_archive(url: &str, version_req: &VersionReq) -> crate::Result<(Versio
3434
///
3535
/// # Errors
3636
/// Returns an error if the extraction fails.
37-
pub fn extract(bytes: &Vec<u8>, out_dir: &Path) -> crate::Result<()> {
37+
pub fn extract(url: &str, bytes: &Vec<u8>, out_dir: &Path) -> crate::Result<()> {
3838
RUNTIME
3939
.handle()
40-
.block_on(async move { crate::extract(bytes, out_dir).await })
40+
.block_on(async move { crate::extract(url, bytes, out_dir).await })
4141
}

postgresql_archive/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ pub enum Error {
3131
/// Unexpected error
3232
#[error("{0}")]
3333
Unexpected(String),
34+
/// Unsupported extractor
35+
#[error("unsupported extractor for '{0}'")]
36+
UnsupportedExtractor(String),
3437
/// Unsupported hasher
3538
#[error("unsupported hasher for '{0}'")]
3639
UnsupportedHasher(String),
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod registry;
2+
pub mod theseus_postgresql_binary;
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
use crate::extractor::theseus_postgresql_binary;
2+
use crate::Error::{PoisonedLock, UnsupportedExtractor};
3+
use crate::{Result, THESEUS_POSTGRESQL_BINARIES_URL};
4+
use lazy_static::lazy_static;
5+
use std::path::Path;
6+
use std::sync::{Arc, Mutex, RwLock};
7+
8+
lazy_static! {
9+
static ref REGISTRY: Arc<Mutex<RepositoryRegistry>> =
10+
Arc::new(Mutex::new(RepositoryRegistry::default()));
11+
}
12+
13+
type SupportsFn = fn(&str) -> Result<bool>;
14+
type ExtractFn = fn(&Vec<u8>, &Path) -> Result<()>;
15+
16+
/// Singleton struct to store extractors
17+
#[allow(clippy::type_complexity)]
18+
struct RepositoryRegistry {
19+
extractors: Vec<(Arc<RwLock<SupportsFn>>, Arc<RwLock<ExtractFn>>)>,
20+
}
21+
22+
impl RepositoryRegistry {
23+
/// Creates a new extractor registry.
24+
fn new() -> Self {
25+
Self {
26+
extractors: Vec::new(),
27+
}
28+
}
29+
30+
/// Registers an extractor. Newly registered extractors take precedence over existing ones.
31+
fn register(&mut self, supports_fn: SupportsFn, extract_fn: ExtractFn) {
32+
self.extractors.insert(
33+
0,
34+
(
35+
Arc::new(RwLock::new(supports_fn)),
36+
Arc::new(RwLock::new(extract_fn)),
37+
),
38+
);
39+
}
40+
41+
/// Gets an extractor that supports the specified URL
42+
///
43+
/// # Errors
44+
/// * If the URL is not supported.
45+
fn get(&self, url: &str) -> Result<ExtractFn> {
46+
for (supports_fn, extractor_fn) in &self.extractors {
47+
let supports_function = supports_fn
48+
.read()
49+
.map_err(|error| PoisonedLock(error.to_string()))?;
50+
if supports_function(url)? {
51+
let extractor_function = extractor_fn
52+
.read()
53+
.map_err(|error| PoisonedLock(error.to_string()))?;
54+
return Ok(*extractor_function);
55+
}
56+
}
57+
58+
Err(UnsupportedExtractor(url.to_string()))
59+
}
60+
}
61+
62+
impl Default for RepositoryRegistry {
63+
/// Creates a new repository registry with the default repositories registered.
64+
fn default() -> Self {
65+
let mut registry = Self::new();
66+
registry.register(
67+
|url| Ok(url.starts_with(THESEUS_POSTGRESQL_BINARIES_URL)),
68+
theseus_postgresql_binary::extract,
69+
);
70+
registry
71+
}
72+
}
73+
74+
/// Registers an extractor. Newly registered extractors take precedence over existing ones.
75+
///
76+
/// # Errors
77+
/// * If the registry is poisoned.
78+
#[allow(dead_code)]
79+
pub fn register(supports_fn: SupportsFn, extractor_fn: ExtractFn) -> Result<()> {
80+
let mut registry = REGISTRY
81+
.lock()
82+
.map_err(|error| PoisonedLock(error.to_string()))?;
83+
registry.register(supports_fn, extractor_fn);
84+
Ok(())
85+
}
86+
87+
/// Gets an extractor that supports the specified URL
88+
///
89+
/// # Errors
90+
/// * If the URL is not supported.
91+
pub fn get(url: &str) -> Result<ExtractFn> {
92+
let registry = REGISTRY
93+
.lock()
94+
.map_err(|error| PoisonedLock(error.to_string()))?;
95+
registry.get(url)
96+
}
97+
98+
#[cfg(test)]
99+
mod tests {
100+
use super::*;
101+
102+
#[test]
103+
fn test_register() -> Result<()> {
104+
register(|url| Ok(url == "https://foo.com"), |_, _| Ok(()))?;
105+
let url = "https://foo.com";
106+
let extractor = get(url)?;
107+
assert!(extractor(&Vec::new(), Path::new("foo")).is_ok());
108+
Ok(())
109+
}
110+
111+
#[test]
112+
fn test_get_error() {
113+
let error = get("foo").unwrap_err();
114+
assert_eq!("unsupported extractor for 'foo'", error.to_string());
115+
}
116+
117+
#[test]
118+
fn test_get_theseus_postgresql_binaries() {
119+
assert!(get(THESEUS_POSTGRESQL_BINARIES_URL).is_ok());
120+
}
121+
}

0 commit comments

Comments
 (0)