Skip to content

Commit dc6e7cf

Browse files
authored
fix: display the failed sqllogictest file and query that failed in case of a panic (#18785)
## Which issue does this PR close? - Closes #18784. ## Rationale for this change To be able to the failed tests ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Manually, by adding panic in the join like in the issue linked PR and saw that it working for datafusion Added panic in `sort_merge_join` and run: ``` RUST_BACKTRACE=1 cargo test --test sqllogictests -- join ``` <details> <summary>Logs for running join slt tests (7 files)</summary> ``` Running bin/sqllogictests.rs (target/debug/deps/sqllogictests-0092c8e14262530a) [00:00:00] #####################------------------- 3/6 "join_lists.slt" [00:00:00] #################----------------------- 12/30 "join_is_not_distinct_from.slt" [00:00:00] ######---------------------------------- 11/81 "sort_merge_join.slt" [00:00:00] ##-------------------------------------- 7/224 "join_disable_repartition_joins.slt" [00:00:00] ######---------------------------------- 11/81 "sort_merge_join.slt" [00:00:00] #####----------------------------------- 27/224 "join_disable_repartition_joins.slt" [00:00:00] #####----------------------------------- 26/217 "join_only.slt" [00:00:00] ##-------------------------------------- 19/531 "joins.slt" 0: __rustc::rust_begin_unwind at /rustc/f8297e351a40c1439a467bbbb6879088047f50b3/library/std/src/panicking.rs:698:5 1: core::panicking::panic_fmt at /rustc/f8297e351a40c1439a467bbbb6879088047f50b3/library/core/src/panicking.rs:75:14 2: datafusion_physical_plan::joins::sort_merge_join::stream::SortMergeJoinStream::filter_record_batch_by_join_type at /Users/rluvaton/dev/open-source/apache/datafusion/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs:1573:17 3: datafusion_physical_plan::joins::sort_merge_join::stream::SortMergeJoinStream::filter_joined_batch at /Users/rluvaton/dev/open-source/apache/datafusion/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs:1536:14 4: <datafusion_physical_plan::joins::sort_merge_join::stream::SortMergeJoinStream as futures_core::stream::Stream>::poll_next at /Users/rluvaton/dev/open-source/apache/datafusion/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs:603:54 5: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-core-0.3.31/src/stream.rs:130:33 6: <S as futures_core::stream::TryStream>::try_poll_next at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-core-0.3.31/src/stream.rs:206:14 7: <futures_util::stream::try_stream::try_collect::TryCollect<St,C> as core::future::future::Future>::poll at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.31/src/stream/try_stream/try_collect.rs:46:47 8: datafusion_physical_plan::common::collect::{{closure}} at /Users/rluvaton/dev/open-source/apache/datafusion/datafusion/physical-plan/src/common.rs:43:36 9: datafusion_sqllogictest::engines::datafusion_engine::runner::run_query::{{closure}} at ./src/engines/datafusion_engine/runner.rs:151:53 10: <datafusion_sqllogictest::engines::datafusion_engine::runner::DataFusion as sqllogictest::runner::AsyncDB>::run::{{closure}} at ./src/engines/datafusion_engine/runner.rs:101:84 11: <core::pin::Pin<P> as core::future::future::Future>::poll at /rustc/f8297e351a40c1439a467bbbb6879088047f50b3/library/core/src/future/future.rs:133:9 12: sqllogictest::runner::Runner<D,M>::apply_record::{{closure}} at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/sqllogictest-0.28.4/src/runner.rs:893:62 13: sqllogictest::runner::Runner<D,M>::run_async_no_retry::{{closure}} at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/sqllogictest-0.28.4/src/runner.rs:1031:56 14: sqllogictest::runner::Runner<D,M>::run_async::{{closure}} at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/sqllogictest-0.28.4/src/runner.rs:1007:52 15: sqllogictests::run_file_in_runner::{{closure}} at ./bin/sqllogictests.rs:386:52 16: sqllogictests::run_test_file::{{closure}} at ./bin/sqllogictests.rs:365:60 17: sqllogictests::run_tests::{{closure}}::{{closure}}::{{closure}} at ./bin/sqllogictests.rs:189:26 18: datafusion_common_runtime::trace_utils::trace_future::{{closure}} at /Users/rluvaton/dev/open-source/apache/datafusion/datafusion/common-runtime/src/trace_utils.rs:137:29 19: <core::pin::Pin<P> as core::future::future::Future>::poll at /rustc/f8297e351a40c1439a467bbbb6879088047f50b3/library/core/src/future/future.rs:133:9 20: <futures_util::future::future::map::Map<Fut,F> as core::future::future::Future>::poll at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.31/src/future/future/map.rs:55:44 21: <futures_util::future::future::Map<Fut,F> as core::future::future::Future>::poll at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.31/src/lib.rs:86:35 22: <core::pin::Pin<P> as core::future::future::Future>::poll at /rustc/f8297e351a40c1439a467bbbb6879088047f50b3/library/core/src/future/future.rs:133:9 23: tokio::runtime::task::core::Core<T,S>::poll::{{closure}} at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/task/core.rs:365:24 24: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/loom/std/unsafe_cell.rs:16:9 25: tokio::runtime::task::core::Core<T,S>::poll at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/task/core.rs:354:30 26: tokio::runtime::task::harness::poll_future::{{closure}} at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/task/harness.rs:535:30 27: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once at /rustc/f8297e351a40c1439a467bbbb6879088047f50b3/library/core/src/panic/unwind_safe.rs:274:9 28: std::panicking::catch_unwind::do_call at /rustc/f8297e351a40c1439a467bbbb6879088047f50b3/library/std/src/panicking.rs:590:40 29: ___rust_try 30: std::panicking::catch_unwind at /rustc/f8297e351a40c1439a467bbbb6879088047f50b3/library/std/src/panicking.rs:553:19 31: std::panic::catch_unwind at /rustc/f8297e351a40c1439a467bbbb6879088047f50b3/library/std/src/panic.rs:359:14 32: tokio::runtime::task::harness::poll_future at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/task/harness.rs:523:18 33: tokio::runtime::task::harness::Harness<T,S>::poll_inner at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/task/harness.rs:210:27 34: tokio::runtime::task::harness::Harness<T,S>::poll at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/task/harness.rs:155:20 35: tokio::runtime::task::raw::poll at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/task/raw.rs:325:13 36: tokio::runtime::task::raw::RawTask::poll at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/task/raw.rs:255:18 37: tokio::runtime::task::LocalNotified<S>::run at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/task/mod.rs:509:13 38: tokio::runtime::scheduler::multi_thread::worker::Context::run_task::{{closure}} at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/scheduler/multi_thread/worker.rs:600:18 39: tokio::task::coop::with_budget at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/task/coop/mod.rs:167:5 40: tokio::task::coop::budget at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/task/coop/mod.rs:133:5 41: tokio::runtime::scheduler::multi_thread::worker::Context::run_task at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/scheduler/multi_thread/worker.rs:591:9 42: tokio::runtime::scheduler::multi_thread::worker::Context::run at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/scheduler/multi_thread/worker.rs:539:29 43: tokio::runtime::scheduler::multi_thread::worker::run::{{closure}}::{{closure}} at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/scheduler/multi_thread/worker.rs:504:24 44: tokio::runtime::context::scoped::Scoped<T>::set at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/context/scoped.rs:40:9 45: tokio::runtime::context::set_scheduler::{{closure}} at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/context.rs:176:38 46: std::thread::local::LocalKey<T>::try_with at /rustc/f8297e351a40c1439a467bbbb6879088047f50b3/library/std/src/thread/local.rs:315:12 47: std::thread::local::LocalKey<T>::with at /rustc/f8297e351a40c1439a467bbbb6879088047f50b3/library/std/src/thread/local.rs:279:20 48: tokio::runtime::context::set_scheduler at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/context.rs:176:17 49: tokio::runtime::scheduler::multi_thread::worker::run::{{closure}} at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/scheduler/multi_thread/worker.rs:499:9 50: tokio::runtime::context::runtime::enter_runtime at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/context/runtime.rs:65:16 51: tokio::runtime::scheduler::multi_thread::worker::run at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/scheduler/multi_thread/worker.rs:491:5 52: tokio::runtime::scheduler::multi_thread::worker::Launch::launch::{{closure}} at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/scheduler/multi_thread/worker.rs:457:45 53: <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/blocking/task.rs:42:21 54: tokio::runtime::task::core::Core<T,S>::poll::{{closure}} at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/task/core.rs:365:24 55: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/loom/std/unsafe_cell.rs:16:9 56: tokio::runtime::task::core::Core<T,S>::poll at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/task/core.rs:354:30 57: tokio::runtime::task::harness::poll_future::{{closure}} at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/task/harness.rs:535:30 58: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once at /rustc/f8297e351a40c1439a467bbbb6879088047f50b3/library/core/src/panic/unwind_safe.rs:274:9 59: std::panicking::catch_unwind::do_call at /rustc/f8297e351a40c1439a467bbbb6879088047f50b3/library/std/src/panicking.rs:590:40 60: ___rust_try 61: std::panicking::catch_unwind at /rustc/f8297e351a40c1439a467bbbb6879088047f50b3/library/std/src/panicking.rs:553:19 62: std::panic::catch_unwind at /rustc/f8297e351a40c1439a467bbbb6879088047f50b3/library/std/src/panic.rs:359:14 63: tokio::runtime::task::harness::poll_future at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/task/harness.rs:523:18 64: tokio::runtime::task::harness::Harness<T,S>::poll_inner at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/task/harness.rs:210:27 65: tokio::runtime::task::harness::Harness<T,S>::poll at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/task/harness.rs:155:20 66: tokio::runtime::task::raw::poll at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/task/raw.rs:325:13 67: tokio::runtime::task::raw::RawTask::poll at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/task/raw.rs:255:18 68: tokio::runtime::task::UnownedTask<S>::run at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/task/mod.rs:546:13 69: tokio::runtime::blocking::pool::Task::run at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/blocking/pool.rs:161:19 70: tokio::runtime::blocking::pool::Inner::run at /Users/rluvaton/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/blocking/pool.rs:516:22 Completed 7 test files in 2 seconds failure in sort_merge_join.slt for sql SELECT * FROM t1 RIGHT JOIN t2 ON t1.a = t2.a AND t2.b * 50 <= t1.b caused by External error: task 19 panicked with message "test" Error: Execution("1 failures") error: test failed, to rerun pass `-p datafusion-sqllogictest --test sqllogictests` Caused by: process didn't exit successfully: `/Users/rluvaton/dev/open-source/apache/datafusion/target/debug/deps/sqllogictests-0092c8e14262530a join` (exit status: 1) ``` </details> ## Are there any user-facing changes? Yes, added new struct for tracking running sql
1 parent 99ab87c commit dc6e7cf

File tree

7 files changed

+240
-15
lines changed

7 files changed

+240
-15
lines changed

datafusion/sqllogictest/bin/sqllogictests.rs

Lines changed: 73 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ use datafusion::common::utils::get_available_parallelism;
2121
use datafusion::common::{exec_datafusion_err, exec_err, DataFusionError, Result};
2222
use datafusion_sqllogictest::{
2323
df_value_validator, read_dir_recursive, setup_scratch_dir, should_skip_file,
24-
should_skip_record, value_normalizer, DataFusion, DataFusionSubstraitRoundTrip,
25-
Filter, TestContext,
24+
should_skip_record, value_normalizer, CurrentlyExecutingSqlTracker, DataFusion,
25+
DataFusionSubstraitRoundTrip, Filter, TestContext,
2626
};
2727
use futures::stream::StreamExt;
2828
use indicatif::{
@@ -41,6 +41,7 @@ use crate::postgres_container::{
4141
initialize_postgres_container, terminate_postgres_container,
4242
};
4343
use datafusion::common::runtime::SpawnedTask;
44+
use futures::FutureExt;
4445
use std::ffi::OsStr;
4546
use std::fs;
4647
use std::path::{Path, PathBuf};
@@ -154,6 +155,11 @@ async fn run_tests() -> Result<()> {
154155
let m_style_clone = m_style.clone();
155156
let filters = options.filters.clone();
156157

158+
let relative_path = test_file.relative_path.clone();
159+
160+
let currently_running_sql_tracker = CurrentlyExecutingSqlTracker::new();
161+
let currently_running_sql_tracker_clone =
162+
currently_running_sql_tracker.clone();
157163
SpawnedTask::spawn(async move {
158164
match (
159165
options.postgres_runner,
@@ -167,6 +173,7 @@ async fn run_tests() -> Result<()> {
167173
m_clone,
168174
m_style_clone,
169175
filters.as_ref(),
176+
currently_running_sql_tracker_clone,
170177
)
171178
.await?
172179
}
@@ -177,12 +184,19 @@ async fn run_tests() -> Result<()> {
177184
m_clone,
178185
m_style_clone,
179186
filters.as_ref(),
187+
currently_running_sql_tracker_clone,
180188
)
181189
.await?
182190
}
183191
(false, true, _) => {
184-
run_complete_file(test_file, validator, m_clone, m_style_clone)
185-
.await?
192+
run_complete_file(
193+
test_file,
194+
validator,
195+
m_clone,
196+
m_style_clone,
197+
currently_running_sql_tracker_clone,
198+
)
199+
.await?
186200
}
187201
(true, false, _) => {
188202
run_test_file_with_postgres(
@@ -191,6 +205,7 @@ async fn run_tests() -> Result<()> {
191205
m_clone,
192206
m_style_clone,
193207
filters.as_ref(),
208+
currently_running_sql_tracker_clone,
194209
)
195210
.await?
196211
}
@@ -200,21 +215,50 @@ async fn run_tests() -> Result<()> {
200215
validator,
201216
m_clone,
202217
m_style_clone,
218+
currently_running_sql_tracker_clone,
203219
)
204220
.await?
205221
}
206222
}
207223
Ok(()) as Result<()>
208224
})
209225
.join()
226+
.map(move |result| (result, relative_path, currently_running_sql_tracker))
210227
})
211228
// run up to num_cpus streams in parallel
212229
.buffer_unordered(options.test_threads)
213-
.flat_map(|result| {
230+
.flat_map(|(result, test_file_path, current_sql)| {
214231
// Filter out any Ok() leaving only the DataFusionErrors
215232
futures::stream::iter(match result {
216233
// Tokio panic error
217-
Err(e) => Some(DataFusionError::External(Box::new(e))),
234+
Err(e) => {
235+
let error = DataFusionError::External(Box::new(e));
236+
let current_sql = current_sql.get_currently_running_sqls();
237+
238+
if current_sql.is_empty() {
239+
Some(error.context(format!(
240+
"failure in {} with no currently running sql tracked",
241+
test_file_path.display()
242+
)))
243+
} else if current_sql.len() == 1 {
244+
let sql = &current_sql[0];
245+
Some(error.context(format!(
246+
"failure in {} for sql {sql}",
247+
test_file_path.display()
248+
)))
249+
} else {
250+
let sqls = current_sql
251+
.iter()
252+
.enumerate()
253+
.map(|(i, sql)| format!("\n[{}]: {}", i + 1, sql))
254+
.collect::<String>();
255+
Some(error.context(format!(
256+
"failure in {} for multiple currently running sqls: {}",
257+
test_file_path.display(),
258+
sqls
259+
)))
260+
}
261+
}
218262
Ok(thread_result) => thread_result.err(),
219263
})
220264
})
@@ -247,6 +291,7 @@ async fn run_test_file_substrait_round_trip(
247291
mp: MultiProgress,
248292
mp_style: ProgressStyle,
249293
filters: &[Filter],
294+
currently_executing_sql_tracker: CurrentlyExecutingSqlTracker,
250295
) -> Result<()> {
251296
let TestFile {
252297
path,
@@ -269,7 +314,8 @@ async fn run_test_file_substrait_round_trip(
269314
test_ctx.session_ctx().clone(),
270315
relative_path.clone(),
271316
pb.clone(),
272-
))
317+
)
318+
.with_currently_executing_sql_tracker(currently_executing_sql_tracker.clone()))
273319
});
274320
runner.add_label("DatafusionSubstraitRoundTrip");
275321
runner.with_column_validator(strict_column_validator);
@@ -286,6 +332,7 @@ async fn run_test_file(
286332
mp: MultiProgress,
287333
mp_style: ProgressStyle,
288334
filters: &[Filter],
335+
currently_executing_sql_tracker: CurrentlyExecutingSqlTracker,
289336
) -> Result<()> {
290337
let TestFile {
291338
path,
@@ -308,7 +355,8 @@ async fn run_test_file(
308355
test_ctx.session_ctx().clone(),
309356
relative_path.clone(),
310357
pb.clone(),
311-
))
358+
)
359+
.with_currently_executing_sql_tracker(currently_executing_sql_tracker.clone()))
312360
});
313361
runner.add_label("Datafusion");
314362
runner.with_column_validator(strict_column_validator);
@@ -402,6 +450,7 @@ async fn run_test_file_with_postgres(
402450
mp: MultiProgress,
403451
mp_style: ProgressStyle,
404452
filters: &[Filter],
453+
currently_executing_sql_tracker: CurrentlyExecutingSqlTracker,
405454
) -> Result<()> {
406455
use datafusion_sqllogictest::Postgres;
407456
let TestFile {
@@ -417,7 +466,11 @@ async fn run_test_file_with_postgres(
417466
pb.set_message(format!("{:?}", &relative_path));
418467

419468
let mut runner = sqllogictest::Runner::new(|| {
420-
Postgres::connect(relative_path.clone(), pb.clone())
469+
Postgres::connect_with_tracked_sql(
470+
relative_path.clone(),
471+
pb.clone(),
472+
currently_executing_sql_tracker.clone(),
473+
)
421474
});
422475
runner.add_label("postgres");
423476
runner.with_column_validator(strict_column_validator);
@@ -435,6 +488,7 @@ async fn run_test_file_with_postgres(
435488
_mp: MultiProgress,
436489
_mp_style: ProgressStyle,
437490
_filters: &[Filter],
491+
_currently_executing_sql_tracker: CurrentlyExecutingSqlTracker,
438492
) -> Result<()> {
439493
use datafusion::common::plan_err;
440494
plan_err!("Can not run with postgres as postgres feature is not enabled")
@@ -445,6 +499,7 @@ async fn run_complete_file(
445499
validator: Validator,
446500
mp: MultiProgress,
447501
mp_style: ProgressStyle,
502+
currently_executing_sql_tracker: CurrentlyExecutingSqlTracker,
448503
) -> Result<()> {
449504
let TestFile {
450505
path,
@@ -470,7 +525,8 @@ async fn run_complete_file(
470525
test_ctx.session_ctx().clone(),
471526
relative_path.clone(),
472527
pb.clone(),
473-
))
528+
)
529+
.with_currently_executing_sql_tracker(currently_executing_sql_tracker.clone()))
474530
});
475531

476532
let col_separator = " ";
@@ -497,6 +553,7 @@ async fn run_complete_file_with_postgres(
497553
validator: Validator,
498554
mp: MultiProgress,
499555
mp_style: ProgressStyle,
556+
currently_executing_sql_tracker: CurrentlyExecutingSqlTracker,
500557
) -> Result<()> {
501558
use datafusion_sqllogictest::Postgres;
502559
let TestFile {
@@ -516,7 +573,11 @@ async fn run_complete_file_with_postgres(
516573
pb.set_message(format!("{:?}", &relative_path));
517574

518575
let mut runner = sqllogictest::Runner::new(|| {
519-
Postgres::connect(relative_path.clone(), pb.clone())
576+
Postgres::connect_with_tracked_sql(
577+
relative_path.clone(),
578+
pb.clone(),
579+
currently_executing_sql_tracker.clone(),
580+
)
520581
});
521582
runner.add_label("postgres");
522583
runner.with_column_validator(strict_column_validator);
@@ -547,6 +608,7 @@ async fn run_complete_file_with_postgres(
547608
_validator: Validator,
548609
_mp: MultiProgress,
549610
_mp_style: ProgressStyle,
611+
_currently_executing_sql_tracker: CurrentlyExecutingSqlTracker,
550612
) -> Result<()> {
551613
use datafusion::common::plan_err;
552614
plan_err!("Can not run with postgres as postgres feature is not enabled")
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
use std::sync::atomic::AtomicUsize;
20+
use std::sync::{Arc, Mutex};
21+
22+
/// Hold the currently executed SQL statements.
23+
/// This is used to save the currently running SQLs in case of a crash.
24+
#[derive(Clone)]
25+
pub struct CurrentlyExecutingSqlTracker {
26+
/// The index of the SQL statement.
27+
/// Used to uniquely identify each SQL statement even if they are the same.
28+
sql_index: Arc<AtomicUsize>,
29+
/// Lock to store the currently executed SQL statement.
30+
/// It DOES NOT hold the lock for the duration of query execution and only execute the lock
31+
/// when updating the currently executed SQL statement to allow for saving the last executed SQL
32+
/// in case of a crash.
33+
currently_executed_sqls: Arc<Mutex<HashMap<usize, String>>>,
34+
}
35+
36+
impl Default for CurrentlyExecutingSqlTracker {
37+
fn default() -> Self {
38+
Self::new()
39+
}
40+
}
41+
42+
impl CurrentlyExecutingSqlTracker {
43+
pub fn new() -> Self {
44+
Self {
45+
sql_index: Arc::new(AtomicUsize::new(0)),
46+
currently_executed_sqls: Arc::new(Mutex::new(HashMap::new())),
47+
}
48+
}
49+
50+
/// Set the currently executed SQL statement.
51+
///
52+
/// Returns a key to use to remove the SQL statement when done.
53+
///
54+
/// We are not returning a guard that will automatically remove the SQL statement when dropped.
55+
/// as on panic the drop can be called, and it will remove the SQL statement before we can log it.
56+
#[must_use = "The returned index must be used to remove the SQL statement when done."]
57+
pub fn set_sql(&self, sql: impl Into<String>) -> usize {
58+
let index = self
59+
.sql_index
60+
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
61+
self.currently_executed_sqls
62+
.lock()
63+
.unwrap_or_else(|e| e.into_inner())
64+
.insert(index, sql.into());
65+
index
66+
}
67+
68+
/// Remove the currently executed SQL statement by the provided key that was returned by [`Self::set_sql`].
69+
pub fn remove_sql(&self, index: usize) {
70+
self.currently_executed_sqls
71+
.lock()
72+
.unwrap_or_else(|e| e.into_inner())
73+
.remove(&index);
74+
}
75+
76+
/// Get the currently executed SQL statements.
77+
pub fn get_currently_running_sqls(&self) -> Vec<String> {
78+
self.currently_executed_sqls
79+
.lock()
80+
.unwrap_or_else(|e| e.into_inner())
81+
.values()
82+
.cloned()
83+
.collect()
84+
}
85+
}

datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ use std::sync::Arc;
1919
use std::{path::PathBuf, time::Duration};
2020

2121
use super::{error::Result, normalize, DFSqlLogicTestError};
22+
use crate::engines::currently_executed_sql::CurrentlyExecutingSqlTracker;
23+
use crate::engines::output::{DFColumnType, DFOutput};
24+
use crate::is_spark_path;
2225
use arrow::record_batch::RecordBatch;
2326
use async_trait::async_trait;
2427
use datafusion::physical_plan::common::collect;
@@ -30,13 +33,11 @@ use log::{debug, log_enabled, warn};
3033
use sqllogictest::DBOutput;
3134
use tokio::time::Instant;
3235

33-
use crate::engines::output::{DFColumnType, DFOutput};
34-
use crate::is_spark_path;
35-
3636
pub struct DataFusion {
3737
ctx: SessionContext,
3838
relative_path: PathBuf,
3939
pb: ProgressBar,
40+
currently_executing_sql_tracker: CurrentlyExecutingSqlTracker,
4041
}
4142

4243
impl DataFusion {
@@ -45,6 +46,20 @@ impl DataFusion {
4546
ctx,
4647
relative_path,
4748
pb,
49+
currently_executing_sql_tracker: CurrentlyExecutingSqlTracker::default(),
50+
}
51+
}
52+
53+
/// Add a tracker that will track the currently executed SQL statement.
54+
///
55+
/// This is useful for logging and debugging purposes.
56+
pub fn with_currently_executing_sql_tracker(
57+
self,
58+
currently_executing_sql_tracker: CurrentlyExecutingSqlTracker,
59+
) -> Self {
60+
Self {
61+
currently_executing_sql_tracker,
62+
..self
4863
}
4964
}
5065

@@ -79,10 +94,14 @@ impl sqllogictest::AsyncDB for DataFusion {
7994
);
8095
}
8196

97+
let tracked_sql = self.currently_executing_sql_tracker.set_sql(sql);
98+
8299
let start = Instant::now();
83100
let result = run_query(&self.ctx, is_spark_path(&self.relative_path), sql).await;
84101
let duration = start.elapsed();
85102

103+
self.currently_executing_sql_tracker.remove_sql(tracked_sql);
104+
86105
if duration.gt(&Duration::from_millis(500)) {
87106
self.update_slow_count();
88107
}

0 commit comments

Comments
 (0)