Skip to content

Commit

Permalink
add data modify to paddle fdw
Browse files Browse the repository at this point in the history
  • Loading branch information
burmecia committed Jun 12, 2024
1 parent 49a5cd2 commit 8e849a6
Show file tree
Hide file tree
Showing 6 changed files with 427 additions and 117 deletions.
391 changes: 309 additions & 82 deletions wasm-wrappers/fdw/paddle_fdw/src/bindings.rs

Large diffs are not rendered by default.

82 changes: 71 additions & 11 deletions wasm-wrappers/fdw/paddle_fdw/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[allow(warnings)]
mod bindings;
use serde_json::{json, Value as JsonValue};
use serde_json::{json, Map as JsonMap, Value as JsonValue};

use bindings::{
exports::supabase::wrappers::routines::Guest,
Expand All @@ -19,6 +19,7 @@ struct PaddleFdw {
object: String,
src_rows: Vec<JsonValue>,
src_idx: usize,
rowid_col: String,
}

static mut INSTANCE: *mut PaddleFdw = std::ptr::null_mut::<PaddleFdw>();
Expand Down Expand Up @@ -100,7 +101,7 @@ impl PaddleFdw {
return Ok(());
}

http::error_for_status(&resp)?;
http::error_for_status(&resp).map_err(|err| format!("{}: {}", err, resp.body))?;

// save source rows
self.src_rows = resp_json
Expand Down Expand Up @@ -189,11 +190,38 @@ impl PaddleFdw {
None
}
}
TypeOid::Json => src.as_str().map(|v| Cell::Json(v.to_owned())),
TypeOid::Json => src.as_object().map(|_| Cell::Json(src.to_string())),
};

Ok(cell)
}

// convert a row to JSON string, which is used as request body for row update
fn row_to_body(&self, row: &Row) -> Result<String, FdwError> {
let mut map = JsonMap::new();

for (col_name, cell) in row.cols().iter().zip(row.cells().iter()) {
if let Some(cell) = cell {
let value = match cell {
Cell::Bool(v) => JsonValue::Bool(*v),
Cell::I64(v) => JsonValue::String(v.to_string()),
Cell::String(v) => JsonValue::String(v.to_string()),
Cell::Date(v) => JsonValue::String(time::epoch_ms_to_rfc3339(v * 1_000_000)?),
Cell::Timestamp(v) => JsonValue::String(time::epoch_ms_to_rfc3339(*v)?),
Cell::Timestamptz(v) => JsonValue::String(time::epoch_ms_to_rfc3339(*v)?),
Cell::Json(v) => {
serde_json::from_str::<JsonValue>(v).map_err(|e| e.to_string())?
}
_ => {
return Err(format!("column '{}' type is not supported", col_name));
}
};
map.insert(col_name.to_owned(), value);
}
}

Ok(JsonValue::Object(map).to_string())
}
}

impl Guest for PaddleFdw {
Expand Down Expand Up @@ -283,24 +311,56 @@ impl Guest for PaddleFdw {
Ok(())
}

fn begin_modify(_ctx: &Context) -> FdwResult {
unimplemented!("update on foreign table is not supported");
fn begin_modify(ctx: &Context) -> FdwResult {
let this = Self::this_mut();
let opts = ctx.get_options(OptionsType::Table);
this.object = opts.require("object")?;
this.rowid_col = opts.require("rowid_column")?;
Ok(())
}

fn insert(_ctx: &Context, _row: &Row) -> FdwResult {
unimplemented!("update on foreign table is not supported");
fn insert(_ctx: &Context, row: &Row) -> FdwResult {
let this = Self::this_mut();
let url = format!("{}/{}", this.base_url, this.object);
let body = this.row_to_body(row)?;
let req = http::Request {
method: http::Method::Post,
url,
headers: this.headers.clone(),
body,
};
let resp = http::post(&req)?;
http::error_for_status(&resp).map_err(|err| format!("{}: {}", err, resp.body))?;
stats::inc_stats(FDW_NAME, stats::Metric::RowsOut, 1);
Ok(())
}

fn update(_ctx: &Context, _rowid: Cell, _row: &Row) -> FdwResult {
unimplemented!("update on foreign table is not supported");
fn update(_ctx: &Context, rowid: Cell, row: &Row) -> FdwResult {
let this = Self::this_mut();
let id = match rowid {
Cell::String(s) => s.clone(),
_ => return Err("invalid rowid column value".to_string()),
};
let url = format!("{}/{}/{}", this.base_url, this.object, id);
let body = this.row_to_body(row)?;
let req = http::Request {
method: http::Method::Patch,
url,
headers: this.headers.clone(),
body,
};
let resp = http::patch(&req)?;
http::error_for_status(&resp).map_err(|err| format!("{}: {}", err, resp.body))?;
stats::inc_stats(FDW_NAME, stats::Metric::RowsOut, 1);
Ok(())
}

fn delete(_ctx: &Context, _rowid: Cell) -> FdwResult {
unimplemented!("update on foreign table is not supported");
unimplemented!("delete on foreign table is not supported");
}

fn end_modify(_ctx: &Context) -> FdwResult {
unimplemented!("update on foreign table is not supported");
Ok(())
}
}

Expand Down
4 changes: 4 additions & 0 deletions wasm-wrappers/wit/http.wit
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ interface http {
variant method {
get,
post,
put,
patch,
delete,
}

record request {
Expand All @@ -26,6 +29,7 @@ interface http {
get: func(req: request) -> http-result;
post: func(req: request) -> http-result;
put: func(req: request) -> http-result;
patch: func(req: request) -> http-result;
delete: func(req: request) -> http-result;

error-for-status: func(resp: response) -> result<_, http-error>;
Expand Down
3 changes: 3 additions & 0 deletions wasm-wrappers/wit/time.wit
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ interface time {
// parse string from an user-specified format to microseconds since Unix epoch
parse-from-str: func(s: string, fmt: string) -> time-result;

// convert microseconds since Unix epoch to RFC3339 string
epoch-ms-to-rfc3339: func(msecs: s64) -> result<string, time-error>;

// sleep for a while
sleep: func(millis: u64);
}
58 changes: 34 additions & 24 deletions wrappers/src/fdw/wasm_fdw/host/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,27 @@ fn create_client(req: &http::Request) -> Result<ClientWithMiddleware, String> {
}

impl FdwHost {
// make a http request
fn http_request(&mut self, req: http::Request) -> http::HttpResult {
supabase_wrappers::prelude::report_info(&format!("req: {:?}", req));
let client = create_client(&req)?;
let resp = self
.rt
.block_on(
match req.method {
http::Method::Get => client.get(req.url),
http::Method::Post => client.post(req.url),
http::Method::Put => client.put(req.url),
http::Method::Patch => client.patch(req.url),
http::Method::Delete => client.delete(req.url),
}
.body(req.body)
.send(),
)
.map_err(|e| e.to_string())?;
self.convert_to_guest_response(resp)
}

// convert reqwest response to guest response
fn convert_to_guest_response(&mut self, resp: Response) -> http::HttpResult {
let url = resp.url().to_string();
Expand All @@ -64,40 +85,29 @@ impl FdwHost {
}

impl http::Host for FdwHost {
#[inline]
fn get(&mut self, req: http::Request) -> http::HttpResult {
let client = create_client(&req)?;
let resp = self
.rt
.block_on(client.get(req.url).send())
.map_err(|e| e.to_string())?;
self.convert_to_guest_response(resp)
self.http_request(req)
}

#[inline]
fn post(&mut self, req: http::Request) -> http::HttpResult {
let client = create_client(&req)?;
let resp = self
.rt
.block_on(client.post(req.url).body(req.body).send())
.map_err(|e| e.to_string())?;
self.convert_to_guest_response(resp)
self.http_request(req)
}

#[inline]
fn put(&mut self, req: http::Request) -> http::HttpResult {
let client = create_client(&req)?;
let resp = self
.rt
.block_on(client.put(req.url).body(req.body).send())
.map_err(|e| e.to_string())?;
self.convert_to_guest_response(resp)
self.http_request(req)
}

#[inline]
fn patch(&mut self, req: http::Request) -> http::HttpResult {
self.http_request(req)
}

#[inline]
fn delete(&mut self, req: http::Request) -> http::HttpResult {
let client = create_client(&req)?;
let resp = self
.rt
.block_on(client.delete(req.url).send())
.map_err(|e| e.to_string())?;
self.convert_to_guest_response(resp)
self.http_request(req)
}

fn error_for_status(&mut self, resp: http::Response) -> Result<(), http::HttpError> {
Expand Down
6 changes: 6 additions & 0 deletions wrappers/src/fdw/wasm_fdw/host/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ impl time::Host for FdwHost {
.map_err(|e| e.to_string())
}

fn epoch_ms_to_rfc3339(&mut self, msecs: i64) -> Result<String, time::TimeError> {
DateTime::from_timestamp_micros(msecs)
.map(|ts| ts.to_rfc3339())
.ok_or("invalid microseconds since Unix Epoch".to_string())
}

fn sleep(&mut self, millis: u64) {
std::thread::sleep(std::time::Duration::from_millis(millis));
}
Expand Down

0 comments on commit 8e849a6

Please sign in to comment.