Skip to content

Commit 1cc67ab

Browse files
authored
datafusion-cli: Refactor statement execution logic (#16634)
1 parent 5a48857 commit 1cc67ab

File tree

1 file changed

+64
-18
lines changed

1 file changed

+64
-18
lines changed

datafusion-cli/src/exec.rs

Lines changed: 64 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,6 @@ pub(super) async fn exec_and_print(
214214
print_options: &PrintOptions,
215215
sql: String,
216216
) -> Result<()> {
217-
let now = Instant::now();
218217
let task_ctx = ctx.task_ctx();
219218
let options = task_ctx.session_config().options();
220219
let dialect = &options.sql_parser.dialect;
@@ -228,25 +227,43 @@ pub(super) async fn exec_and_print(
228227

229228
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
230229
for statement in statements {
231-
let adjusted =
232-
AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement);
230+
StatementExecutor::new(statement)
231+
.execute(ctx, print_options)
232+
.await?;
233+
}
233234

234-
let plan = create_plan(ctx, statement.clone(), false).await?;
235-
let adjusted = adjusted.with_plan(&plan);
235+
Ok(())
236+
}
236237

237-
let df = match ctx.execute_logical_plan(plan).await {
238-
Ok(df) => df,
239-
Err(DataFusionError::ObjectStore(Generic { store, source: _ }))
240-
if "S3".eq_ignore_ascii_case(store)
241-
&& matches!(&statement, Statement::CreateExternalTable(_)) =>
242-
{
243-
warn!("S3 region is incorrect, auto-detecting the correct region (this may be slow). Consider updating your region configuration.");
244-
let plan = create_plan(ctx, statement, true).await?;
245-
ctx.execute_logical_plan(plan).await?
246-
}
247-
Err(e) => return Err(e),
248-
};
238+
/// Executor for SQL statements, including special handling for S3 region detection retry logic
239+
struct StatementExecutor {
240+
statement: Statement,
241+
statement_for_retry: Option<Statement>,
242+
}
243+
244+
impl StatementExecutor {
245+
fn new(statement: Statement) -> Self {
246+
let statement_for_retry = matches!(statement, Statement::CreateExternalTable(_))
247+
.then(|| statement.clone());
248+
249+
Self {
250+
statement,
251+
statement_for_retry,
252+
}
253+
}
254+
255+
async fn execute(
256+
self,
257+
ctx: &dyn CliSessionContext,
258+
print_options: &PrintOptions,
259+
) -> Result<()> {
260+
let now = Instant::now();
261+
let (df, adjusted) = self
262+
.create_and_execute_logical_plan(ctx, print_options)
263+
.await?;
249264
let physical_plan = df.create_physical_plan().await?;
265+
let task_ctx = ctx.task_ctx();
266+
let options = task_ctx.session_config().options();
250267

251268
// Track memory usage for the query result if it's bounded
252269
let mut reservation =
@@ -296,9 +313,38 @@ pub(super) async fn exec_and_print(
296313
)?;
297314
reservation.free();
298315
}
316+
317+
Ok(())
299318
}
300319

301-
Ok(())
320+
async fn create_and_execute_logical_plan(
321+
mut self,
322+
ctx: &dyn CliSessionContext,
323+
print_options: &PrintOptions,
324+
) -> Result<(datafusion::dataframe::DataFrame, AdjustedPrintOptions)> {
325+
let adjusted = AdjustedPrintOptions::new(print_options.clone())
326+
.with_statement(&self.statement);
327+
328+
let plan = create_plan(ctx, self.statement, false).await?;
329+
let adjusted = adjusted.with_plan(&plan);
330+
331+
let df = match ctx.execute_logical_plan(plan).await {
332+
Ok(df) => Ok(df),
333+
Err(DataFusionError::ObjectStore(Generic { store, source: _ }))
334+
if "S3".eq_ignore_ascii_case(store)
335+
&& self.statement_for_retry.is_some() =>
336+
{
337+
warn!("S3 region is incorrect, auto-detecting the correct region (this may be slow). Consider updating your region configuration.");
338+
let plan =
339+
create_plan(ctx, self.statement_for_retry.take().unwrap(), true)
340+
.await?;
341+
ctx.execute_logical_plan(plan).await
342+
}
343+
Err(e) => Err(e),
344+
}?;
345+
346+
Ok((df, adjusted))
347+
}
302348
}
303349

304350
/// Track adjustments to the print options based on the plan / statement being executed

0 commit comments

Comments
 (0)