Skip to content

Commit 1abe26e

Browse files
committed
Substitute parking_lot::Mutex for std::sync::Mutex
1 parent 5764e4f commit 1abe26e

35 files changed

+143
-129
lines changed

ballista/rust/client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ log = "0.4"
3535
tokio = "1.0"
3636
tempfile = "3"
3737
sqlparser = "0.13"
38+
parking_lot = "0.11"
3839

3940
datafusion = { path = "../../../datafusion", version = "6.0.0" }
4041

ballista/rust/client/src/context.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ use sqlparser::ast::Statement;
2121
use std::collections::HashMap;
2222
use std::fs;
2323
use std::path::PathBuf;
24-
use std::sync::{Arc, Mutex};
24+
use std::sync::Arc;
25+
use parking_lot::Mutex;
2526

2627
use ballista_core::config::BallistaConfig;
2728
use ballista_core::utils::create_df_ctx_with_ballista_query_planner;
@@ -142,7 +143,7 @@ impl BallistaContext {
142143

143144
// use local DataFusion context for now but later this might call the scheduler
144145
let mut ctx = {
145-
let guard = self.state.lock().unwrap();
146+
let guard = self.state.lock();
146147
create_df_ctx_with_ballista_query_planner(
147148
&guard.scheduler_host,
148149
guard.scheduler_port,
@@ -162,7 +163,7 @@ impl BallistaContext {
162163

163164
// use local DataFusion context for now but later this might call the scheduler
164165
let mut ctx = {
165-
let guard = self.state.lock().unwrap();
166+
let guard = self.state.lock();
166167
create_df_ctx_with_ballista_query_planner(
167168
&guard.scheduler_host,
168169
guard.scheduler_port,
@@ -186,7 +187,7 @@ impl BallistaContext {
186187

187188
// use local DataFusion context for now but later this might call the scheduler
188189
let mut ctx = {
189-
let guard = self.state.lock().unwrap();
190+
let guard = self.state.lock();
190191
create_df_ctx_with_ballista_query_planner(
191192
&guard.scheduler_host,
192193
guard.scheduler_port,
@@ -203,7 +204,7 @@ impl BallistaContext {
203204
name: &str,
204205
table: Arc<dyn TableProvider>,
205206
) -> Result<()> {
206-
let mut state = self.state.lock().unwrap();
207+
let mut state = self.state.lock();
207208
state.tables.insert(name.to_owned(), table);
208209
Ok(())
209210
}
@@ -280,7 +281,7 @@ impl BallistaContext {
280281
/// might require the schema to be inferred.
281282
pub async fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
282283
let mut ctx = {
283-
let state = self.state.lock().unwrap();
284+
let state = self.state.lock();
284285
create_df_ctx_with_ballista_query_planner(
285286
&state.scheduler_host,
286287
state.scheduler_port,
@@ -291,7 +292,7 @@ impl BallistaContext {
291292
let is_show = self.is_show_statement(sql).await?;
292293
// the show tables、 show columns sql can not run at scheduler because the tables is store at client
293294
if is_show {
294-
let state = self.state.lock().unwrap();
295+
let state = self.state.lock();
295296
ctx = ExecutionContext::with_config(
296297
ExecutionConfig::new().with_information_schema(
297298
state.config.default_with_information_schema(),
@@ -301,7 +302,7 @@ impl BallistaContext {
301302

302303
// register tables with DataFusion context
303304
{
304-
let state = self.state.lock().unwrap();
305+
let state = self.state.lock();
305306
for (name, prov) in &state.tables {
306307
ctx.register_table(
307308
TableReference::Bare { table: name },
@@ -483,7 +484,7 @@ mod tests {
483484
.unwrap();
484485

485486
{
486-
let mut guard = context.state.lock().unwrap();
487+
let mut guard = context.state.lock();
487488
let csv_table = guard.tables.get("single_nan");
488489

489490
if let Some(table_provide) = csv_table {

ballista/rust/core/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ parse_arg = "0.1.3"
4848
arrow-flight = { version = "8.0.0" }
4949
datafusion = { path = "../../../datafusion", version = "6.0.0" }
5050

51+
parking_lot = "0.11"
52+
5153
[dev-dependencies]
5254
tempfile = "3"
5355

ballista/rust/core/src/client.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
//! Client API for sending requests to executors.
1919
20-
use std::sync::{Arc, Mutex};
20+
use std::sync::Arc;
21+
use parking_lot::Mutex;
2122
use std::{collections::HashMap, pin::Pin};
2223
use std::{
2324
convert::{TryFrom, TryInto},
@@ -154,7 +155,7 @@ impl Stream for FlightDataStream {
154155
self: std::pin::Pin<&mut Self>,
155156
cx: &mut Context<'_>,
156157
) -> Poll<Option<Self::Item>> {
157-
let mut stream = self.stream.lock().expect("mutex is bad");
158+
let mut stream = self.stream.lock();
158159
stream.poll_next_unpin(cx).map(|x| match x {
159160
Some(flight_data_chunk_result) => {
160161
let converted_chunk = flight_data_chunk_result

ballista/rust/core/src/execution_plans/shuffle_writer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
use std::fs::File;
2424
use std::iter::Iterator;
2525
use std::path::PathBuf;
26-
use std::sync::{Arc, Mutex};
26+
use std::sync::Arc;
27+
use parking_lot::Mutex;
2728
use std::time::Instant;
2829
use std::{any::Any, pin::Pin};
2930

ballista/rust/executor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ tokio-stream = { version = "0.1", features = ["net"] }
4646
tonic = "0.6"
4747
uuid = { version = "0.8", features = ["v4"] }
4848
hyper = "0.14.4"
49+
parking_lot = "0.11"
4950

5051
[dev-dependencies]
5152

ballista/rust/scheduler/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ tokio-stream = { version = "0.1", features = ["net"], optional = true }
5353
tonic = "0.6"
5454
tower = { version = "0.4" }
5555
warp = "0.3"
56+
parking_lot = "0.11"
5657

5758
[dev-dependencies]
5859
ballista-core = { path = "../core", version = "0.6.0" }

datafusion/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ avro-rs = { version = "0.13", features = ["snappy"], optional = true }
7878
num-traits = { version = "0.2", optional = true }
7979
pyo3 = { version = "0.15", optional = true }
8080
tempfile = "3"
81+
parking_lot = "0.11"
8182

8283
[dev-dependencies]
8384
criterion = "0.3"

datafusion/benches/aggregate_query_sql.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ use crate::criterion::Criterion;
2525
use data_utils::create_table_provider;
2626
use datafusion::error::Result;
2727
use datafusion::execution::context::ExecutionContext;
28-
use std::sync::{Arc, Mutex};
28+
use std::sync::Arc;
29+
use parking_lot::Mutex;
2930
use tokio::runtime::Runtime;
3031

3132
fn query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
3233
let rt = Runtime::new().unwrap();
33-
let df = rt.block_on(ctx.lock().unwrap().sql(sql)).unwrap();
34+
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
3435
criterion::black_box(rt.block_on(df.collect()).unwrap());
3536
}
3637

datafusion/benches/math_query_sql.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
extern crate criterion;
2020
use criterion::Criterion;
2121

22-
use std::sync::{Arc, Mutex};
22+
use std::sync::Arc;
23+
use parking_lot::Mutex;
2324

2425
use tokio::runtime::Runtime;
2526

@@ -40,7 +41,7 @@ fn query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
4041
let rt = Runtime::new().unwrap();
4142

4243
// execute the query
43-
let df = rt.block_on(ctx.lock().unwrap().sql(sql)).unwrap();
44+
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
4445
rt.block_on(df.collect()).unwrap();
4546
}
4647

0 commit comments

Comments
 (0)