Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 3 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ arrow-ord = "57.1"
arrow-schema = "57.1"
arrow-select = "57.1"
arrow-string = "57.1"
async-compat = "0.2.5"
async-fs = "2.2.0"
async-lock = "3.4"
async-stream = "0.3.6"
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/duckdb-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl DuckClient {
}?;

let connection = db.connect()?;
vortex_duckdb::register_table_functions(&connection)?;
vortex_duckdb::initialize(&db)?;

// Enable Parquet metadata cache for all benchmark runs.
//
Expand Down
20 changes: 20 additions & 0 deletions docs/user-guide/duckdb.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ without it, DuckDB defaults to CSV.
COPY (SELECT * FROM my_table) TO 'output.vortex' (FORMAT vortex);
```

## Extension Options

### `vortex_filesystem`

Controls which filesystem implementation is used for reading and writing Vortex files.

| Value | Description |
|-------|-------------|
| `'vortex'` (default) | Uses Vortex's built-in object store filesystem. Supports `file://` and `s3://` schemes. |
| `'duckdb'` | Uses DuckDB's built-in filesystem, including any filesystem extensions such as `httpfs`. |

```sql
SET vortex_filesystem = 'duckdb';
```

Use `'duckdb'` when you want to leverage DuckDB's filesystem extensions (e.g., `httpfs` for HTTP
or S3 access with DuckDB's credential management). Use `'vortex'` (the default) for direct
object store access via Vortex's own S3 integration, which reads credentials from environment
variables.

## Python

The DuckDB Python client works with `read_vortex` the same way:
Expand Down
4 changes: 2 additions & 2 deletions vortex-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ mod common_tests {
use vortex::array::ArrayRef;
use vortex::array::arrow::FromArrowArray;
use vortex::file::WriteOptionsSessionExt;
use vortex::io::ObjectStoreWriter;
use vortex::io::VortexWrite;
use vortex::io::object_store::ObjectStoreWrite;
use vortex::session::VortexSession;

use crate::VortexFormatFactory;
Expand Down Expand Up @@ -123,7 +123,7 @@ mod common_tests {
P: Into<object_store::path::Path>,
{
let array = ArrayRef::from_arrow(batch, false)?;
let mut write = ObjectStoreWriter::new(self.store.clone(), &path.into()).await?;
let mut write = ObjectStoreWrite::new(self.store.clone(), &path.into()).await?;
VX_SESSION
.write_options()
.write(&mut write, array.to_array_stream())
Expand Down
6 changes: 3 additions & 3 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use vortex::file::EOF_SIZE;
use vortex::file::MAX_POSTSCRIPT_SIZE;
use vortex::file::OpenOptionsSessionExt;
use vortex::file::VORTEX_FILE_EXTENSION;
use vortex::io::file::object_store::ObjectStoreSource;
use vortex::io::object_store::ObjectStoreReadAt;
use vortex::io::session::RuntimeSessionExt;
use vortex::scalar::Scalar;
use vortex::session::VortexSession;
Expand Down Expand Up @@ -261,7 +261,7 @@ impl FileFormat for VortexFormat {
}

// Not cached or invalid - open the file
let reader = Arc::new(ObjectStoreSource::new(
let reader = Arc::new(ObjectStoreReadAt::new(
store,
object.location.clone(),
session.handle(),
Expand Down Expand Up @@ -328,7 +328,7 @@ impl FileFormat for VortexFormat {
Some(metadata) => metadata,
None => {
// Not cached - open the file
let reader = Arc::new(ObjectStoreSource::new(
let reader = Arc::new(ObjectStoreReadAt::new(
store,
object.location.clone(),
session.handle(),
Expand Down
4 changes: 2 additions & 2 deletions vortex-datafusion/src/persistent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ mod tests {
use vortex::array::validity::Validity;
use vortex::buffer::buffer;
use vortex::file::WriteOptionsSessionExt;
use vortex::io::ObjectStoreWriter;
use vortex::io::VortexWrite;
use vortex::io::object_store::ObjectStoreWrite;
use vortex::session::VortexSession;

use crate::common_tests::TestSessionContext;
Expand Down Expand Up @@ -65,7 +65,7 @@ mod tests {
Validity::NonNullable,
)?;

let mut writer = ObjectStoreWriter::new(ctx.store.clone(), &"test.vortex".into()).await?;
let mut writer = ObjectStoreWrite::new(ctx.store.clone(), &"test.vortex".into()).await?;

let summary = session
.write_options()
Expand Down
4 changes: 2 additions & 2 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,8 @@ mod tests {
use vortex::array::arrow::FromArrowArray;
use vortex::buffer::Buffer;
use vortex::file::WriteOptionsSessionExt;
use vortex::io::ObjectStoreWriter;
use vortex::io::VortexWrite;
use vortex::io::object_store::ObjectStoreWrite;
use vortex::metrics::DefaultMetricsRegistry;
use vortex::scan::Selection;
use vortex::session::VortexSession;
Expand Down Expand Up @@ -540,7 +540,7 @@ mod tests {
let array = ArrayRef::from_arrow(rb, false)?;
let path = Path::parse(path)?;

let mut write = ObjectStoreWriter::new(object_store, &path).await?;
let mut write = ObjectStoreWrite::new(object_store, &path).await?;
let summary = SESSION
.write_options()
.write(&mut write, array.to_array_stream())
Expand Down
4 changes: 2 additions & 2 deletions vortex-datafusion/src/persistent/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use datafusion_common::Result as DFResult;
use object_store::ObjectStore;
use vortex::io::VortexReadAt;
use vortex::io::file::object_store::ObjectStoreSource;
use vortex::io::object_store::ObjectStoreReadAt;
use vortex::io::session::RuntimeSessionExt;
use vortex::session::VortexSession;

Expand Down Expand Up @@ -38,7 +38,7 @@ impl VortexReaderFactory for DefaultVortexReaderFactory {
path: &str,
session: &VortexSession,
) -> DFResult<Arc<dyn VortexReadAt>> {
Ok(Arc::new(ObjectStoreSource::new(
Ok(Arc::new(ObjectStoreReadAt::new(
self.object_store.clone(),
path.into(),
session.handle(),
Expand Down
6 changes: 3 additions & 3 deletions vortex-datafusion/src/persistent/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use vortex::dtype::DType;
use vortex::dtype::arrow::FromArrowType;
use vortex::file::WriteOptionsSessionExt;
use vortex::file::WriteSummary;
use vortex::io::ObjectStoreWriter;
use vortex::io::VortexWrite;
use vortex::io::object_store::ObjectStoreWrite;
use vortex::session::VortexSession;

pub struct VortexSink {
Expand Down Expand Up @@ -124,9 +124,9 @@ impl FileSink for VortexSink {

let stream_adapter = ArrayStreamAdapter::new(dtype, stream);

let mut object_writer = ObjectStoreWriter::new(object_store, &path)
let mut object_writer = ObjectStoreWrite::new(object_store, &path)
.await
.map_err(|e| exec_datafusion_err!("Failed to create ObjectStoreWriter: {e}"))?;
.map_err(|e| exec_datafusion_err!("Failed to create ObjectStoreWrite: {e}"))?;

let summary = session
.write_options()
Expand Down
2 changes: 2 additions & 0 deletions vortex-duckdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ crate-type = ["staticlib", "cdylib", "rlib"]
[dependencies]
anyhow = { workspace = true }
async-fs = { workspace = true }
async-trait = { workspace = true }
bitvec = { workspace = true }
custom-labels = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
num-traits = { workspace = true }
object_store = { workspace = true, features = ["aws"] }
parking_lot = { workspace = true }
paste = { workspace = true }
tempfile = { workspace = true }
Expand Down
16 changes: 7 additions & 9 deletions vortex-duckdb/cpp/client_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,27 @@
#include <duckdb/main/connection.hpp>
#include <duckdb/storage/object_cache.hpp>

extern "C" duckdb_vx_client_context duckdb_vx_connection_get_client_context(duckdb_connection conn) {
extern "C" duckdb_client_context duckdb_vx_connection_get_client_context(duckdb_connection conn) {
try {
auto connection = reinterpret_cast<duckdb::Connection *>(conn);
return reinterpret_cast<duckdb_vx_client_context>(connection->context.get());
return reinterpret_cast<duckdb_client_context>(connection->context.get());
} catch (...) {
return nullptr;
}
}

extern "C" duckdb_vx_object_cache
duckdb_vx_client_context_get_object_cache(duckdb_vx_client_context context) {
extern "C" duckdb_vx_object_cache duckdb_client_context_get_object_cache(duckdb_client_context ffi_context) {
try {
auto client_context = reinterpret_cast<duckdb::ClientContext *>(context);
auto *context = reinterpret_cast<duckdb::ClientContext *>(ffi_context);
// This is okay because this is a ref to the object cache, this lives as long as the database.
return reinterpret_cast<duckdb_vx_object_cache>(
&duckdb::ObjectCache::GetObjectCache(*client_context));
return reinterpret_cast<duckdb_vx_object_cache>(&duckdb::ObjectCache::GetObjectCache(*context));
} catch (...) {
return nullptr;
}
}

extern "C" duckdb_value duckdb_vx_client_context_try_get_current_setting(duckdb_vx_client_context context,
const char *key) {
extern "C" duckdb_value duckdb_client_context_try_get_current_setting(duckdb_client_context context,
const char *key) {
if (!context || !key) {
return nullptr;
}
Expand Down
11 changes: 11 additions & 0 deletions vortex-duckdb/cpp/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "include/duckdb_vx/config.h"
#include "duckdb.hpp"
#include "duckdb/main/capi/capi_internal.hpp"
#include "duckdb/main/config.hpp"
#include <string>
#include <memory>
Expand All @@ -14,6 +15,16 @@ using namespace duckdb;

extern "C" {

duckdb_config duckdb_vx_database_get_config(duckdb_database database) {
if (!database) {
return nullptr;
}

auto wrapper = reinterpret_cast<DatabaseWrapper *>(database);
auto &config = DBConfig::GetConfig(*wrapper->database->instance);
return reinterpret_cast<duckdb_config>(&config);
}

duckdb_state duckdb_vx_get_config_value(duckdb_config config, const char *key, duckdb_value *out_value) {
if (!config || !key || !out_value) {
return DuckDBError;
Expand Down
21 changes: 10 additions & 11 deletions vortex-duckdb/cpp/copy_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@

#include "duckdb_vx.h"
#include "duckdb_vx/data.hpp"

#include "duckdb/function/copy_function.hpp"

#include "duckdb/main/capi/capi_internal.hpp"
#include "duckdb/main/client_context.hpp"
#include "duckdb/main/connection.hpp"
#include "duckdb/parser/parsed_data/create_copy_function_info.hpp"
Expand Down Expand Up @@ -76,7 +75,7 @@ unique_ptr<GlobalFunctionData>
c_init_global(ClientContext &context, FunctionData &bind_data, const string &file_path) {
auto &bind = bind_data.Cast<CCopyBindData>();
duckdb_vx_error error_out = nullptr;
auto global_data = bind.vtab.init_global(reinterpret_cast<duckdb_vx_client_context>(&context),
auto global_data = bind.vtab.init_global(reinterpret_cast<duckdb_client_context>(&context),
bind.ffi_data->DataPtr(),
file_path.c_str(),
&error_out);
Expand Down Expand Up @@ -131,12 +130,13 @@ extern "C" duckdb_vx_copy_func_vtab_t *get_vtab_one() {
return &copy_vtab_one;
}

extern "C" duckdb_state duckdb_vx_copy_func_register_vtab_one(duckdb_connection ffi_conn) {
if (!ffi_conn) {
extern "C" duckdb_state duckdb_vx_copy_func_register_vtab_one(duckdb_database ffi_db) {
if (!ffi_db) {
return DuckDBError;
}

auto conn = reinterpret_cast<Connection *>(ffi_conn);
auto wrapper = reinterpret_cast<duckdb::DatabaseWrapper *>(ffi_db);
auto db = wrapper->database->instance;
auto copy_function = CopyFunction(copy_vtab_one.name);

copy_function.copy_to_bind = c_bind_one;
Expand All @@ -154,11 +154,10 @@ extern "C" duckdb_state duckdb_vx_copy_func_register_vtab_one(duckdb_connection
// TODO(joe): handle parameters as in table_function

try {
CreateCopyFunctionInfo info(std::move(copy_function));
auto &system_catalog = Catalog::GetSystemCatalog(*conn->context->db);
auto data = CatalogTransaction::GetSystemTransaction(*conn->context->db);
system_catalog.CreateCopyFunction(data, info);

auto &system_catalog = Catalog::GetSystemCatalog(*db);
auto data = CatalogTransaction::GetSystemTransaction(*db);
CreateCopyFunctionInfo copy_info(std::move(copy_function));
system_catalog.CreateCopyFunction(data, copy_info);
} catch (...) {
return DuckDBError;
}
Expand Down
Loading
Loading