Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(wasm): clean up download, byte handling and hashing #379

Merged
merged 4 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion wrappers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ wasm_fdw = [
"serde",
"serde_json",
"jwt-simple",
"bytes",
]
# Does not include helloworld_fdw because of its general uselessness
native_fdws = [
Expand Down Expand Up @@ -241,7 +242,7 @@ jwt-simple = { version = "0.12.9", default-features = false, features = [
dirs = { version = "5.0.1", optional = true }
sha2 = { version = "0.10.8", optional = true }
hex = { version = "0.4.3", optional = true }

bytes = { version = "1.9.0", optional = true }
thiserror = { version = "1.0.48", optional = true }
anyhow = { version = "1.0.81", optional = true }

Expand Down
1 change: 1 addition & 0 deletions wrappers/src/fdw/wasm_fdw/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This is Wasm foreign data wrapper host, please visit each Wasm foreign data wrap

| Version | Date | Notes |
| ------- | ---------- | ---------------------------------------------------- |
| 0.1.4 | 2024-12-09 | Improve remote wasm downloading and caching |
| 0.1.3 | 2024-09-30 | Support for pgrx 0.12.6 |
| 0.1.2 | 2024-07-07 | Add fdw_package_checksum server option |
| 0.1.1 | 2024-07-05 | Fix missing wasm package cache dir issue |
Expand Down
171 changes: 123 additions & 48 deletions wrappers/src/fdw/wasm_fdw/wasm_fdw.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use bytes::Bytes;
use pgrx::pg_sys;
use semver::{Version, VersionReq};
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use warg_client as warg;
use wasmtime::component::*;
use wasmtime::{Config, Engine, Store};
Expand Down Expand Up @@ -41,8 +43,6 @@ fn load_component_from_file(
Component::from_file(engine, file_path).map_err(|_| WasmFdwError::InvalidWasmComponent)
}

// Download wasm component package from warg registry or custom url.
// The url protoal can be 'file://', 'warg(s)://' or 'http(s)://'.
fn download_component(
rt: &Runtime,
engine: &Engine,
Expand All @@ -51,78 +51,153 @@ fn download_component(
version: &str,
checksum: Option<&str>,
) -> WasmFdwResult<Component> {
// handle local file paths
if let Some(file_path) = url.strip_prefix("file://") {
return load_component_from_file(engine, file_path);
}

// handle warg registry URLs
if url.starts_with("warg://") || url.starts_with("wargs://") {
let url = url
.replacen("warg://", "http://", 1)
.replacen("wargs://", "https://", 1);

// download from warg registry
let config = warg::Config {
disable_interactive: true,
..Default::default()
};
let client = rt.block_on(warg::FileSystemClient::new_with_config(
Some(&url),
&config,
None,
))?;

let pkg_name = warg_protocol::registry::PackageName::new(name)?;
let ver = semver::VersionReq::parse(version)?;
let pkg = rt
.block_on(client.download(&pkg_name, &ver))?
.ok_or(format!("{}@{} not found on {}", name, version, url))?;

return load_component_from_file(engine, pkg.path);
return download_from_warg(rt, engine, url, name, version);
}

// otherwise, download from custom url if it is not in local cache
// handle direct URLs with caching
download_from_url(rt, engine, url, name, version, checksum)
}

fn download_from_warg(
rt: &Runtime,
engine: &Engine,
url: &str,
name: &str,
version: &str,
) -> WasmFdwResult<Component> {
let url = url
.replacen("warg://", "http://", 1)
.replacen("wargs://", "https://", 1);

let config = warg::Config {
disable_interactive: true,
..Default::default()
};

let client = rt.block_on(warg::FileSystemClient::new_with_config(
Some(&url),
&config,
None,
))?;

let pkg_name = warg_protocol::registry::PackageName::new(name)
.map_err(|e| format!("invalid package name '{}': {}", name, e))?;

let ver = semver::VersionReq::parse(version)
.map_err(|e| format!("invalid version requirement '{}': {}", version, e))?;

let pkg = rt
.block_on(client.download(&pkg_name, &ver))?
.ok_or_else(|| format!("{}@{} not found on {}", name, version, url))?;

load_component_from_file(engine, pkg.path)
}

fn download_from_url(
rt: &Runtime,
engine: &Engine,
url: &str,
name: &str,
version: &str,
checksum: Option<&str>,
) -> WasmFdwResult<Component> {
// validate URL
let url = url
.parse::<reqwest::Url>()
.map_err(|e| format!("invalid URL '{}': {}", url, e))?;

// calculate cache path
let cache_path = get_cache_path(url.as_str(), name, version)?;

// return cached component if it exists and is valid
if cache_path.exists() {
if let Ok(component) = load_component_from_file(engine, &cache_path) {
return Ok(component);
}
// if loading fails, remove invalid cache file
let _ = fs::remove_file(&cache_path);
}

// ensure checksum is provided for remote downloads
let checksum = checksum
.ok_or_else(|| "package checksum must be specified for remote downloads".to_string())?;

// download and verify component
let bytes = download_and_verify(rt, url, checksum)?;

// save to cache
save_to_cache(&cache_path, &bytes)?;

// load component
load_component_from_file(engine, &cache_path).inspect_err(|_| {
let _ = fs::remove_file(&cache_path);
})
}

// calculate file name hash and make up cache path
fn get_cache_path(url: &str, name: &str, version: &str) -> WasmFdwResult<PathBuf> {
let hash = Sha256::digest(format!(
"{}:{}:{}@{}",
unsafe { pg_sys::GetUserId().as_u32() },
url,
name,
version
));

let file_name = hex::encode(hash);
let mut path = dirs::cache_dir().expect("no cache dir found");
let mut path = dirs::cache_dir().ok_or_else(|| "no cache directory found".to_string())?;

path.push(file_name);
path.set_extension("wasm");

if !path.exists() {
// package checksum must be specified
let option_checksum = checksum.ok_or("package checksum option not specified".to_owned())?;
Ok(path)
}

// download component wasm from remote and check its checksum
let resp = rt.block_on(reqwest::get(url))?;
let bytes = rt.block_on(resp.bytes())?;
let bytes_checksum = hex::encode(Sha256::digest(&bytes));
if bytes_checksum != option_checksum {
return Err("package checksum not match".to_string().into());
}
fn download_and_verify(
rt: &Runtime,
url: reqwest::Url,
expected_checksum: &str,
) -> WasmFdwResult<Bytes> {
let resp = rt
.block_on(reqwest::get(url.clone()))
.map_err(|_| "failed to download component".to_string())?;

if !resp.status().is_success() {
return Err("component download failed - server error"
.to_string()
.into());
}

// save the component wasm to local cache
if let Some(parent) = path.parent() {
// create all parent directories if they do not exist
fs::create_dir_all(parent)?;
}
fs::write(&path, bytes)?;
let bytes = rt
.block_on(resp.bytes())
.map_err(|_| "failed to read component data".to_string())?;

let actual_checksum = hex::encode(Sha256::digest(&bytes));
if actual_checksum != expected_checksum {
return Err("component verification failed".to_string().into());
}

load_component_from_file(engine, &path).inspect_err(|_| {
// remove the cache file if it cannot be loaded as component
let _ = fs::remove_file(&path);
})
Ok(bytes)
}

fn save_to_cache(path: &Path, bytes: &[u8]) -> WasmFdwResult<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).map_err(|_| "cache access error".to_string())?;
}

fs::write(path, bytes).map_err(|_| "cache write error".to_string())?;

Ok(())
}

#[wrappers_fdw(
version = "0.1.3",
version = "0.1.4",
author = "Supabase",
website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/wasm_fdw",
error_type = "WasmFdwError"
Expand Down
Loading