Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SessionContext::read_batches #9197

Merged
merged 7 commits into from
Feb 14, 2024
Merged

Add SessionContext::read_batches #9197

merged 7 commits into from
Feb 14, 2024

Conversation

Lordworms
Copy link
Contributor

@Lordworms Lordworms commented Feb 12, 2024

Which issue does this PR close?

Closes #9157

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the core Core DataFusion crate label Feb 12, 2024
) -> Result<DataFrame> {
// check schema uniqueness
let mut batches = batches.into_iter().peekable();
let schema: SchemaRef = batches.peek().unwrap().schema().clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way we can avoid an unwrap here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps something like

Suggested change
let schema: SchemaRef = batches.peek().unwrap().schema().clone();
let schema: SchemaRef = if let Some(schema) = batches.peek() {
schema.clone()
} else {
// use empty schema)
Arc::new(Schema::new())
};

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it!

@alamb alamb changed the title feat: issue #9157 adding record_batches for Vec<BatchRecord> Add SessionContext::record_batches Feb 12, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much @Lordworms 🙏

As @Jefffrey pointed out, I think the only thing left for this PR is to handle the case with zero batches and add a test for that case too

) -> Result<DataFrame> {
// check schema uniqueness
let mut batches = batches.into_iter().peekable();
let schema: SchemaRef = batches.peek().unwrap().schema().clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps something like

Suggested change
let schema: SchemaRef = batches.peek().unwrap().schema().clone();
let schema: SchemaRef = if let Some(schema) = batches.peek() {
schema.clone()
} else {
// use empty schema)
Arc::new(Schema::new())
};

],
)
.unwrap(),
RecordBatch::try_new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: I suggest making the test shorter by feeding in 2 batches rather than 5

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @Lordworms -- this looks great.

cc @carols10cents

@@ -1489,7 +1489,7 @@ async fn test_read_batches_empty() -> Result<()> {
let state = SessionState::new_with_config_rt(config, runtime);
let ctx = SessionContext::new_with_state(state);

let schema = Arc::new(Schema::new(vec![
let _schema = Arc::new(Schema::new(vec![
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could simply delete this statement entirely as the Schema is not needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@alamb
Copy link
Contributor

alamb commented Feb 12, 2024

CI failure seems unrelated -- I retriggered it

Copy link
Contributor

@Jefffrey Jefffrey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good, just have a minor question 👍

Comment on lines 950 to 951
let provider =
MemTable::try_new(schema, batches.map(|batch| vec![batch]).collect())?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, is there any difference to mapping each batch into its own vec (which seemingly represents a different partition?) vs just collecting the batches into a single vec and passing in a single vec?

e.g. something like

Suggested change
let provider =
MemTable::try_new(schema, batches.map(|batch| vec![batch]).collect())?;
let provider =
MemTable::try_new(schema, vec![batches.collect()])?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a difference (as you say the code in this PR makes its own partition). I think you are right that a single partition might be better (and DataFusion will repartition the plan into multiple partitions) if necessary

Is this something you can do @Lordworms ? Otherwise we can merge this PR as is and make it as a follow on change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, so what we need to do is to flatten all the batches into a vec? I wonder in what situation the datafusion would do the repartition? Is there any doc for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLDR is that it is added by https://docs.rs/datafusion/latest/datafusion/physical_optimizer/enforce_distribution/struct.EnforceDistribution.html -- the rules about when it happens are non trivial but basically are "when it helps"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I would read about it and commit the changes soon.

@Jefffrey Jefffrey merged commit 61e9605 into apache:main Feb 14, 2024
23 checks passed
@Jefffrey
Copy link
Contributor

Thanks for this 👍

@Dandandan Dandandan changed the title Add SessionContext::record_batches Add SessionContext::read_batches Feb 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add API to read a Vec<RecordBatch> from SessionContext
3 participants