Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose http object store #885

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ uuid = { version = "1.9", features = ["v4"] }
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
async-trait = "0.1"
futures = "0.3"
object_store = { version = "0.11.0", features = ["aws", "gcp", "azure"] }
object_store = { version = "0.11.0", features = ["aws", "gcp", "azure", "http"] }
parking_lot = "0.12"
regex-syntax = "0.8"
syn = "2.0.79"
Expand Down
4 changes: 3 additions & 1 deletion python/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,9 @@ def __init__(

self.ctx = SessionContextInternal(config, runtime)

def register_object_store(self, schema: str, store: Any, host: str | None) -> None:
def register_object_store(
self, schema: str, store: Any, host: str | None = None
) -> None:
"""Add a new object store into the session.
Args:
Expand Down
12 changes: 2 additions & 10 deletions python/datafusion/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,6 @@
GoogleCloud = object_store.GoogleCloud
LocalFileSystem = object_store.LocalFileSystem
MicrosoftAzure = object_store.MicrosoftAzure
Http = object_store.Http

__all__ = [
"AmazonS3",
"GoogleCloud",
"LocalFileSystem",
"MicrosoftAzure",
]


def __getattr__(name):
return getattr(object_store, name)
__all__ = ["AmazonS3", "GoogleCloud", "LocalFileSystem", "MicrosoftAzure", "Http"]
12 changes: 10 additions & 2 deletions python/tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from pyarrow.csv import write_csv
import pyarrow.dataset as ds
import pytest
from datafusion.object_store import LocalFileSystem
from datafusion.object_store import Http

from datafusion import udf, col

Expand Down Expand Up @@ -139,6 +139,15 @@ def test_register_csv_list(ctx, tmp_path):
assert int_sum == 2 * sum(int_values)


def test_register_http_csv(ctx):
url = "https://raw.githubusercontent.com/ibis-project/testing-data/refs/heads/master/csv/diamonds.csv"
ctx.register_object_store("", Http(url))
ctx.register_csv("remote", url)
assert ctx.table_exist("remote")
res, *_ = ctx.sql("SELECT COUNT(*) AS total FROM remote").to_pylist()
assert res["total"] > 0


def test_register_parquet(ctx, tmp_path):
path = helpers.write_parquet(tmp_path / "a.parquet", helpers.data())
ctx.register_parquet("t", path)
Expand Down Expand Up @@ -494,7 +503,6 @@ def test_register_listing_table(

dir_root = f"file://{dir_root}/" if path_to_str else dir_root

ctx.register_object_store("file://local", LocalFileSystem(), None)
ctx.register_listing_table(
"my_table",
dir_root,
Expand Down
10 changes: 2 additions & 8 deletions python/tests/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,15 @@
# under the License.

import os

import pytest

from datafusion import SessionContext
from datafusion.object_store import LocalFileSystem


@pytest.fixture
def local():
return LocalFileSystem()
Comment on lines 22 to -27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't blocking for this PR but it appears we have very little test coverage for object store. It's a bit tricky, though, since I'm not sure where there are publicly available test resources that we could use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, we need more test coverage. I'll try to find some files to test it



@pytest.fixture
def ctx(local):
def ctx():
ctx = SessionContext()
ctx.register_object_store("file://local", local, None)
return ctx


Expand Down
2 changes: 1 addition & 1 deletion python/tests/test_wrapper_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def missing_exports(internal_obj, wrapped_obj) -> None:


def test_datafusion_missing_exports() -> None:
"""Check for any missing pythone exports.
"""Check for any missing python exports.
This test verifies that every exposed class, attribute, and function in
the internal (pyo3) module is also exposed in our python wrappers.
Expand Down
1 change: 1 addition & 0 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ impl PySessionContext {
StorageContexts::GoogleCloudStorage(gcs) => (gcs.inner, gcs.bucket_name),
StorageContexts::MicrosoftAzure(azure) => (azure.inner, azure.container_name),
StorageContexts::LocalFileSystem(local) => (local.inner, "".to_string()),
StorageContexts::HTTP(http) => (http.store, http.url),
};

// let users override the host to match the api signature from upstream
Expand Down
31 changes: 31 additions & 0 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@ use pyo3::prelude::*;
use object_store::aws::{AmazonS3, AmazonS3Builder};
use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
use object_store::http::{HttpBuilder, HttpStore};
use object_store::local::LocalFileSystem;
use pyo3::exceptions::PyValueError;
use url::Url;

#[derive(FromPyObject)]
pub enum StorageContexts {
AmazonS3(PyAmazonS3Context),
GoogleCloudStorage(PyGoogleCloudContext),
MicrosoftAzure(PyMicrosoftAzureContext),
LocalFileSystem(PyLocalFileSystemContext),
HTTP(PyHttpContext),
}

#[pyclass(name = "LocalFileSystem", module = "datafusion.store", subclass)]
Expand Down Expand Up @@ -219,10 +223,37 @@ impl PyAmazonS3Context {
}
}

#[pyclass(name = "Http", module = "datafusion.store", subclass)]
#[derive(Debug, Clone)]
pub struct PyHttpContext {
pub url: String,
pub store: Arc<HttpStore>,
}

#[pymethods]
impl PyHttpContext {
#[new]
fn new(url: String) -> PyResult<Self> {
let store = match Url::parse(url.as_str()) {
Ok(url) => HttpBuilder::new()
.with_url(url.origin().ascii_serialization())
.build()
.map_err(|e| PyValueError::new_err(format!("Error: {:?}", e.to_string())))?,
Err(_) => HttpBuilder::new().build().unwrap(),
};

Ok(Self {
url,
store: Arc::new(store),
})
}
}

pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyAmazonS3Context>()?;
m.add_class::<PyMicrosoftAzureContext>()?;
m.add_class::<PyGoogleCloudContext>()?;
m.add_class::<PyLocalFileSystemContext>()?;
m.add_class::<PyHttpContext>()?;
Ok(())
}
Loading