Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use async_trait::async_trait;
use futures::StreamExt;
use tokio::task::spawn_blocking;

use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_common::{plan_err, Constraints, DataFusionError, Result};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_physical_plan::common::AbortOnDropSingle;
Expand Down Expand Up @@ -100,6 +100,7 @@ pub struct StreamConfig {
encoding: StreamEncoding,
header: bool,
order: Vec<Vec<Expr>>,
constraints: Constraints,
}

impl StreamConfig {
Expand All @@ -118,6 +119,7 @@ impl StreamConfig {
encoding: StreamEncoding::Csv,
order: vec![],
header: false,
constraints: Constraints::empty(),
}
}

Expand Down Expand Up @@ -145,6 +147,12 @@ impl StreamConfig {
self
}

/// Assign constraints
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
self.constraints = constraints;
self
}

fn reader(&self) -> Result<Box<dyn RecordBatchReader>> {
let file = File::open(&self.location)?;
let schema = self.schema.clone();
Expand Down Expand Up @@ -215,6 +223,10 @@ impl TableProvider for StreamTable {
self.0.schema.clone()
}

fn constraints(&self) -> Option<&Constraints> {
Some(&self.0.constraints)
}

fn table_type(&self) -> TableType {
TableType::Base
}
Expand Down
31 changes: 31 additions & 0 deletions datafusion/sqllogictest/test_files/groupby.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4047,3 +4047,34 @@ set datafusion.sql_parser.dialect = 'Generic';

statement ok
drop table aggregate_test_100;


# Create an unbounded external table with primary key
# column c
statement ok
CREATE EXTERNAL TABLE unbounded_multiple_ordered_table_with_pk (
a0 INTEGER,
a INTEGER,
b INTEGER,
c INTEGER primary key,
d INTEGER
)
STORED AS CSV
WITH HEADER ROW
WITH ORDER (a ASC, b ASC)
WITH ORDER (c ASC)
LOCATION '../core/tests/data/window_2.csv';

# Query below can be executed, since c is primary key.
query III rowsort
SELECT c, a, SUM(d)
FROM unbounded_multiple_ordered_table_with_pk
GROUP BY c
ORDER BY c
LIMIT 5
----
0 0 0
1 0 2
2 0 0
3 0 0
4 0 1