Skip to content

Commit

Permalink
fix(wasm): clean up download, byte handling and hashing (#379)
Browse files Browse the repository at this point in the history
* fix(wasm): clean up download, byte handling and hashing

---------

Co-authored-by: Bo Lu <lv.patrick@gmail.com>
  • Loading branch information
akacase and burmecia authored Dec 10, 2024
1 parent 1a3cbd3 commit 2ea964b
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 52 deletions.
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion wrappers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ wasm_fdw = [
"serde",
"serde_json",
"jwt-simple",
"bytes",
]
# Does not include helloworld_fdw because of its general uselessness
native_fdws = [
Expand Down Expand Up @@ -241,7 +242,7 @@ jwt-simple = { version = "0.12.9", default-features = false, features = [
dirs = { version = "5.0.1", optional = true }
sha2 = { version = "0.10.8", optional = true }
hex = { version = "0.4.3", optional = true }

bytes = { version = "1.9.0", optional = true }
thiserror = { version = "1.0.48", optional = true }
anyhow = { version = "1.0.81", optional = true }

Expand Down
1 change: 1 addition & 0 deletions wrappers/src/fdw/wasm_fdw/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This is Wasm foreign data wrapper host, please visit each Wasm foreign data wrap

| Version | Date | Notes |
| ------- | ---------- | ---------------------------------------------------- |
| 0.1.4 | 2024-12-09 | Improve remote wasm downloading and caching |
| 0.1.3 | 2024-09-30 | Support for pgrx 0.12.6 |
| 0.1.2 | 2024-07-07 | Add fdw_package_checksum server option |
| 0.1.1 | 2024-07-05 | Fix missing wasm package cache dir issue |
Expand Down
171 changes: 123 additions & 48 deletions wrappers/src/fdw/wasm_fdw/wasm_fdw.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use bytes::Bytes;
use pgrx::pg_sys;
use semver::{Version, VersionReq};
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use warg_client as warg;
use wasmtime::component::*;
use wasmtime::{Config, Engine, Store};
Expand Down Expand Up @@ -41,8 +43,6 @@ fn load_component_from_file(
Component::from_file(engine, file_path).map_err(|_| WasmFdwError::InvalidWasmComponent)
}

// Download wasm component package from warg registry or custom url.
// The url protoal can be 'file://', 'warg(s)://' or 'http(s)://'.
fn download_component(
rt: &Runtime,
engine: &Engine,
Expand All @@ -51,78 +51,153 @@ fn download_component(
version: &str,
checksum: Option<&str>,
) -> WasmFdwResult<Component> {
// handle local file paths
if let Some(file_path) = url.strip_prefix("file://") {
return load_component_from_file(engine, file_path);
}

// handle warg registry URLs
if url.starts_with("warg://") || url.starts_with("wargs://") {
let url = url
.replacen("warg://", "http://", 1)
.replacen("wargs://", "https://", 1);

// download from warg registry
let config = warg::Config {
disable_interactive: true,
..Default::default()
};
let client = rt.block_on(warg::FileSystemClient::new_with_config(
Some(&url),
&config,
None,
))?;

let pkg_name = warg_protocol::registry::PackageName::new(name)?;
let ver = semver::VersionReq::parse(version)?;
let pkg = rt
.block_on(client.download(&pkg_name, &ver))?
.ok_or(format!("{}@{} not found on {}", name, version, url))?;

return load_component_from_file(engine, pkg.path);
return download_from_warg(rt, engine, url, name, version);
}

// otherwise, download from custom url if it is not in local cache
// handle direct URLs with caching
download_from_url(rt, engine, url, name, version, checksum)
}

fn download_from_warg(
rt: &Runtime,
engine: &Engine,
url: &str,
name: &str,
version: &str,
) -> WasmFdwResult<Component> {
let url = url
.replacen("warg://", "http://", 1)
.replacen("wargs://", "https://", 1);

let config = warg::Config {
disable_interactive: true,
..Default::default()
};

let client = rt.block_on(warg::FileSystemClient::new_with_config(
Some(&url),
&config,
None,
))?;

let pkg_name = warg_protocol::registry::PackageName::new(name)
.map_err(|e| format!("invalid package name '{}': {}", name, e))?;

let ver = semver::VersionReq::parse(version)
.map_err(|e| format!("invalid version requirement '{}': {}", version, e))?;

let pkg = rt
.block_on(client.download(&pkg_name, &ver))?
.ok_or_else(|| format!("{}@{} not found on {}", name, version, url))?;

load_component_from_file(engine, pkg.path)
}

fn download_from_url(
rt: &Runtime,
engine: &Engine,
url: &str,
name: &str,
version: &str,
checksum: Option<&str>,
) -> WasmFdwResult<Component> {
// validate URL
let url = url
.parse::<reqwest::Url>()
.map_err(|e| format!("invalid URL '{}': {}", url, e))?;

// calculate cache path
let cache_path = get_cache_path(url.as_str(), name, version)?;

// return cached component if it exists and is valid
if cache_path.exists() {
if let Ok(component) = load_component_from_file(engine, &cache_path) {
return Ok(component);
}
// if loading fails, remove invalid cache file
let _ = fs::remove_file(&cache_path);
}

// ensure checksum is provided for remote downloads
let checksum = checksum
.ok_or_else(|| "package checksum must be specified for remote downloads".to_string())?;

// download and verify component
let bytes = download_and_verify(rt, url, checksum)?;

// save to cache
save_to_cache(&cache_path, &bytes)?;

// load component
load_component_from_file(engine, &cache_path).inspect_err(|_| {
let _ = fs::remove_file(&cache_path);
})
}

// calculate file name hash and make up cache path
fn get_cache_path(url: &str, name: &str, version: &str) -> WasmFdwResult<PathBuf> {
let hash = Sha256::digest(format!(
"{}:{}:{}@{}",
unsafe { pg_sys::GetUserId().as_u32() },
url,
name,
version
));

let file_name = hex::encode(hash);
let mut path = dirs::cache_dir().expect("no cache dir found");
let mut path = dirs::cache_dir().ok_or_else(|| "no cache directory found".to_string())?;

path.push(file_name);
path.set_extension("wasm");

if !path.exists() {
// package checksum must be specified
let option_checksum = checksum.ok_or("package checksum option not specified".to_owned())?;
Ok(path)
}

// download component wasm from remote and check its checksum
let resp = rt.block_on(reqwest::get(url))?;
let bytes = rt.block_on(resp.bytes())?;
let bytes_checksum = hex::encode(Sha256::digest(&bytes));
if bytes_checksum != option_checksum {
return Err("package checksum not match".to_string().into());
}
fn download_and_verify(
rt: &Runtime,
url: reqwest::Url,
expected_checksum: &str,
) -> WasmFdwResult<Bytes> {
let resp = rt
.block_on(reqwest::get(url.clone()))
.map_err(|_| "failed to download component".to_string())?;

if !resp.status().is_success() {
return Err("component download failed - server error"
.to_string()
.into());
}

// save the component wasm to local cache
if let Some(parent) = path.parent() {
// create all parent directories if they do not exist
fs::create_dir_all(parent)?;
}
fs::write(&path, bytes)?;
let bytes = rt
.block_on(resp.bytes())
.map_err(|_| "failed to read component data".to_string())?;

let actual_checksum = hex::encode(Sha256::digest(&bytes));
if actual_checksum != expected_checksum {
return Err("component verification failed".to_string().into());
}

load_component_from_file(engine, &path).inspect_err(|_| {
// remove the cache file if it cannot be loaded as component
let _ = fs::remove_file(&path);
})
Ok(bytes)
}

fn save_to_cache(path: &Path, bytes: &[u8]) -> WasmFdwResult<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).map_err(|_| "cache access error".to_string())?;
}

fs::write(path, bytes).map_err(|_| "cache write error".to_string())?;

Ok(())
}

#[wrappers_fdw(
version = "0.1.3",
version = "0.1.4",
author = "Supabase",
website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/wasm_fdw",
error_type = "WasmFdwError"
Expand Down

0 comments on commit 2ea964b

Please sign in to comment.