Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.115"
serde_with = "3.7.0"
tokio = { version = "1.37.0", features = ["full"] }
# TODO - turn into optional (feature) module.
polars = { version = "0.38.3", features = ["polars-io", "json"]}
maplit = "1.0.2"
5 changes: 4 additions & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use serde::de::DeserializeOwned;
use serde_json::json;
use std::collections::HashMap;
use std::env;
use std::fmt::Debug;
use tokio::time::{sleep, Duration};

const BASE_URL: &str = "https://api.dune.com/api/v1";
Expand Down Expand Up @@ -140,6 +141,7 @@ impl DuneClient {
._get(job_id, "results")
.await
.map_err(DuneRequestError::from)?;
debug!("Raw Response {:?}", response);
DuneClient::_parse_response::<GetResultResponse<T>>(response).await
}

Expand Down Expand Up @@ -185,7 +187,7 @@ impl DuneClient {
/// Ok(())
/// }
/// ```
pub async fn refresh<T: DeserializeOwned>(
pub async fn refresh<T: DeserializeOwned + Debug>(
&self,
query_id: u32,
parameters: Option<Vec<Parameter>>,
Expand All @@ -203,6 +205,7 @@ impl DuneClient {
status = self.get_status(&job_id).await?
}
let full_response = self.get_results::<T>(&job_id).await;
debug!("Full Response {:?}", full_response);
if status.state == ExecutionStatus::Failed {
warn!(
"{:?} Perhaps your query took too long to run!",
Expand Down
66 changes: 66 additions & 0 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::fmt::Debug;
use crate::{client::DuneClient, error::DuneRequestError, parameters::Parameter};
use polars::{
frame::DataFrame,
prelude::{JsonReader, SerReader},
};
use serde::{de::DeserializeOwned, Serialize};
use std::io::Cursor;

impl DuneClient {
pub async fn fetch_as_dataframe<T: DeserializeOwned + Serialize + Debug>(
&self,
query_id: u32,
parameters: Option<Vec<Parameter>>,
ping_frequency: Option<u64>,
) -> Result<DataFrame, DuneRequestError> {
let results = self
.refresh::<T>(query_id, parameters, ping_frequency)
.await?;
let json = serde_json::to_string(&results.get_rows()).map_err(DuneRequestError::from)?;

let cursor = Cursor::new(json);

Ok(JsonReader::new(cursor).finish()?)
}
}

#[cfg(test)]
mod tests {
use crate::{client::DuneClient, parse_utils::datetime_from_str};
use chrono::{DateTime, Utc};
use polars::export::ahash::HashMap;
use serde::{Deserialize, Serialize};

#[tokio::test]
async fn fetch_as_dataframe() {
#[derive(Serialize, Deserialize, Debug, PartialEq)]
struct ResultStruct {
text_field: String,
number_field: f64,
#[serde(deserialize_with = "datetime_from_str")]
date_field: DateTime<Utc>,
list_field: String,
}

let dune = DuneClient::from_env();
// Response here should be: http://jsonblob.com/1226634378817167360
// {"execution_id":"01HTX4625Y4PA8CHCZWSB5NA0F","query_id":1215383,"is_execution_finished":true,"state":"QUERY_STATE_COMPLETED","submitted_at":"2024-04-07T20:32:19.134841Z","expires_at":"2024-07-06T20:32:19.553234Z","execution_started_at":"2024-04-07T20:32:19.406404Z","execution_ended_at":"2024-04-07T20:32:19.553232Z","result":{"rows":[{"date_field":"2022-05-04 00:00:00.000","list_field":"Option 1","number_field":"3.1415926535","text_field":"Plain Text"}],"metadata":{"column_names":["text_field","number_field","date_field","list_field"],"row_count":1,"result_set_bytes":103,"total_row_count":1,"total_result_set_bytes":103,"datapoint_count":4,"pending_time_millis":271,"execution_time_millis":146}}}
let df = dune
.fetch_as_dataframe::<ResultStruct>(1215383, None, None)
.await
.unwrap();
println!("{:?}", df);
}

#[tokio::test]
async fn lazy_fetch_as_dataframe() {
let dune = DuneClient::from_env();
// This query is a fork of the above, with all string fields.
let df = dune
.fetch_as_dataframe::<HashMap<String, String>>(1832271, None, None)
.await
.unwrap();
println!("{:?}", df);
}
}
43 changes: 40 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ pub enum DuneRequestError {
Dune(String),
/// Errors bubbled up from reqwest::Error
Request(String),
/// Errors bubbled up from Serde (de)serialization
Serde(String),
/// Errors bubbled up from PolarsError
Polars(String),
}

impl From<DuneError> for DuneRequestError {
Expand All @@ -30,22 +34,55 @@ impl From<reqwest::Error> for DuneRequestError {
}
}

impl From<serde_json::Error> for DuneRequestError {
fn from(value: serde_json::Error) -> Self {
DuneRequestError::Serde(value.to_string())
}
}

impl From<polars::error::PolarsError> for DuneRequestError {
fn from(value: polars::error::PolarsError) -> Self {
DuneRequestError::Polars(value.to_string())
}
}

#[cfg(test)]
mod tests {
use super::*;
use maplit::hashmap;
use serde::Serialize;

#[tokio::test]
async fn error_parsing() {
let err = reqwest::get("invalid-url").await.unwrap_err();
async fn async_error_parsing() {
let request_error = reqwest::get("invalid-url").await.unwrap_err();
assert_eq!(
DuneRequestError::from(err),
DuneRequestError::from(request_error),
DuneRequestError::Request("builder error".to_string())
);
}
#[test]
fn standard_error_parsing() {
assert_eq!(
DuneRequestError::from(DuneError {
error: "broken".to_string()
}),
DuneRequestError::Dune("broken".to_string())
);

// An example where serde_json::to_string fails.
// https://www.greyblake.com/blog/when-serde-json-to-string-fails/
#[derive(Serialize, Eq, PartialEq, Hash)]
#[serde(tag = "t")]
enum TestEnum {
Item,
}
let serde_error = serde_json::to_string(&hashmap! {
TestEnum::Item => 2,
})
.unwrap_err();
assert_eq!(
DuneRequestError::from(serde_error),
DuneRequestError::Serde("key must be a string".to_string())
)
}

Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/// DuneClient structure and all API route implementations.
pub mod client;
/// Fetches and parses Dune Query results as Dataframe
pub mod dataframe;
/// DuneRequestError (encapsulating all errors that could arise within network requests and result parsing)
pub mod error;
/// Content related to Query Parameters.
Expand Down
Loading