Skip to content

Commit bc414df

Browse files
authored
refactor: introducing datafusion feature in arrow-pg (#93)
* feat: add datafusion feature to arrow-pg * refactor: move datafusion encoding logic into arrow-pg * ci: add test for cargo-pg default feature Signed-off-by: Ning Sun <sunning@greptime.com> --------- Signed-off-by: Ning Sun <sunning@greptime.com>
1 parent 517145f commit bc414df

File tree

14 files changed

+69
-22
lines changed

14 files changed

+69
-22
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ jobs:
5353
override: true
5454
- name: Build and run tests
5555
run: cargo test --all-features
56+
- name: Test arrow-pg default features
57+
working-directory: arrow-pg
58+
run: cargo test
5659

5760
integration:
5861
name: Integration tests

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/target
22
.direnv
33
.envrc
4-
.vscode
4+
.vscode
5+
.aider*

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

arrow-pg/Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,16 @@ documentation.workspace = true
1212
readme = "../README.md"
1313
rust-version.workspace = true
1414

15+
[features]
16+
default = ["arrow"]
17+
arrow = ["dep:arrow"]
18+
datafusion = ["dep:datafusion"]
19+
1520
[dependencies]
16-
arrow.workspace = true
21+
arrow = { workspace = true, optional = true }
1722
bytes.workspace = true
1823
chrono.workspace = true
24+
datafusion = { workspace = true, optional = true }
1925
futures.workspace = true
2026
pgwire = { workspace = true, features = ["server-api"] }
2127
postgres-types.workspace = true

arrow-pg/src/datatypes.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use std::sync::Arc;
22

3-
use arrow::datatypes::*;
4-
use arrow::record_batch::RecordBatch;
3+
#[cfg(not(feature = "datafusion"))]
4+
use arrow::{datatypes::*, record_batch::RecordBatch};
5+
#[cfg(feature = "datafusion")]
6+
use datafusion::arrow::{datatypes::*, record_batch::RecordBatch};
7+
58
use pgwire::api::portal::Format;
69
use pgwire::api::results::FieldInfo;
710
use pgwire::api::Type;
@@ -11,6 +14,9 @@ use postgres_types::Kind;
1114

1215
use crate::row_encoder::RowEncoder;
1316

17+
#[cfg(feature = "datafusion")]
18+
pub mod df;
19+
1420
pub fn into_pg_type(arrow_type: &DataType) -> PgWireResult<Type> {
1521
Ok(match arrow_type {
1622
DataType::Null => Type::UNKNOWN,

datafusion-postgres/src/datatypes.rs renamed to arrow-pg/src/datatypes/df.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ use pgwire::messages::data::DataRow;
1717
use rust_decimal::prelude::ToPrimitive;
1818
use rust_decimal::Decimal;
1919

20-
use arrow_pg::datatypes::{arrow_schema_to_pg_fields, encode_recordbatch, into_pg_type};
20+
use super::{arrow_schema_to_pg_fields, encode_recordbatch, into_pg_type};
2121

22-
pub(crate) async fn encode_dataframe<'a>(
22+
pub async fn encode_dataframe<'a>(
2323
df: DataFrame,
2424
format: &Format,
2525
) -> PgWireResult<QueryResponse<'a>> {
@@ -51,7 +51,7 @@ pub(crate) async fn encode_dataframe<'a>(
5151
/// If the type is empty or unknown, we fallback to datafusion inferenced type
5252
/// from `inferenced_types`.
5353
/// An error will be raised when neither sources can provide type information.
54-
pub(crate) fn deserialize_parameters<S>(
54+
pub fn deserialize_parameters<S>(
5555
portal: &Portal<S>,
5656
inferenced_types: &[Option<&DataType>],
5757
) -> PgWireResult<ParamValues>

arrow-pg/src/encoder.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ use std::io::Write;
33
use std::str::FromStr;
44
use std::sync::Arc;
55

6-
use arrow::array::*;
7-
use arrow::datatypes::*;
6+
#[cfg(not(feature = "datafusion"))]
7+
use arrow::{array::*, datatypes::*};
88
use bytes::BufMut;
99
use bytes::BytesMut;
1010
use chrono::{NaiveDate, NaiveDateTime};
11+
#[cfg(feature = "datafusion")]
12+
use datafusion::arrow::{array::*, datatypes::*};
1113
use pgwire::api::results::DataRowEncoder;
1214
use pgwire::api::results::FieldFormat;
1315
use pgwire::error::PgWireError;

arrow-pg/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
//! Arrow data encoding and type mapping for Postgres(pgwire).
2+
3+
// #[cfg(all(feature = "arrow", feature = "datafusion"))]
4+
// compile_error!("Feature arrow and datafusion cannot be enabled at same time. Use no-default-features when activating datafusion");
5+
16
pub mod datatypes;
27
pub mod encoder;
38
mod error;

arrow-pg/src/list_encoder.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,36 @@
11
use std::{str::FromStr, sync::Arc};
22

3-
use arrow::array::{
4-
timezone::Tz, Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
5-
LargeBinaryArray, PrimitiveArray, StringArray, Time32MillisecondArray, Time32SecondArray,
6-
Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
7-
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
8-
};
3+
#[cfg(not(feature = "datafusion"))]
94
use arrow::{
5+
array::{
6+
timezone::Tz, Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
7+
LargeBinaryArray, PrimitiveArray, StringArray, Time32MillisecondArray, Time32SecondArray,
8+
Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
9+
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
10+
},
1011
datatypes::{
1112
DataType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type,
1213
Int64Type, Int8Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
1314
Time64NanosecondType, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
1415
},
1516
temporal_conversions::{as_date, as_time},
1617
};
18+
#[cfg(feature = "datafusion")]
19+
use datafusion::arrow::{
20+
array::{
21+
timezone::Tz, Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
22+
LargeBinaryArray, PrimitiveArray, StringArray, Time32MillisecondArray, Time32SecondArray,
23+
Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
24+
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
25+
},
26+
datatypes::{
27+
DataType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type,
28+
Int64Type, Int8Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
29+
Time64NanosecondType, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
30+
},
31+
temporal_conversions::{as_date, as_time},
32+
};
33+
1734
use bytes::{BufMut, BytesMut};
1835
use chrono::{DateTime, TimeZone, Utc};
1936
use pgwire::api::results::FieldFormat;

arrow-pg/src/row_encoder.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
use std::sync::Arc;
22

3+
#[cfg(not(feature = "datafusion"))]
34
use arrow::array::RecordBatch;
5+
#[cfg(feature = "datafusion")]
6+
use datafusion::arrow::array::RecordBatch;
7+
48
use pgwire::{
59
api::results::{DataRowEncoder, FieldInfo},
610
error::PgWireResult,

0 commit comments

Comments
 (0)