Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ impl BallistaContext {
&partition_id.job_id,
partition_id.stage_id as usize,
partition_id.partition_id as usize,
&location.path,
)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?)
Expand Down
27 changes: 25 additions & 2 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ message PhysicalNegativeNode {
}

message UnresolvedShuffleExecNode {
repeated uint32 query_stage_ids = 1;
uint32 query_stage_ids = 1;
Schema schema = 2;
uint32 partition_count = 3;
}
Expand Down Expand Up @@ -706,7 +706,7 @@ message Action {
ExecutePartition execute_partition = 2;

// Fetch a partition from an executor
PartitionId fetch_partition = 3;
FetchPartition fetch_partition = 3;
}

// configuration settings
Expand All @@ -722,18 +722,27 @@ message ExecutePartition {
repeated PartitionLocation partition_location = 5;
}

message FetchPartition {
string job_id = 1;
uint32 stage_id = 2;
uint32 partition_id = 3;
string path = 4;
}

// Mapping from partition id to executor id
message PartitionLocation {
PartitionId partition_id = 1;
ExecutorMetadata executor_meta = 2;
PartitionStats partition_stats = 3;
string path = 4;
}

// Unique identifier for a materialized partition of data
message PartitionId {
string job_id = 1;
uint32 stage_id = 2;
uint32 partition_id = 4;
// string path = 5;
}

message PartitionStats {
Expand Down Expand Up @@ -782,6 +791,17 @@ message FailedTask {

message CompletedTask {
string executor_id = 1;
// TODO tasks are currently always shuffle writes but this will not always be the case
// so we might want to think about some refactoring of the task definitions
repeated ShuffleWritePartition partitions = 2;
}

message ShuffleWritePartition {
uint64 partition_id = 1;
string path = 2;
uint64 num_batches = 3;
uint64 num_rows = 4;
uint64 num_bytes = 5;
}

message TaskStatus {
Expand All @@ -803,6 +823,9 @@ message PollWorkParams {
message TaskDefinition {
PartitionId task_id = 1;
PhysicalPlanNode plan = 2;
// TODO tasks are currently always shuffle writes but this will not always be the case
// so we might want to think about some refactoring of the task definitions
PhysicalHashRepartition shuffle_output_partitioning = 3;
}

message PollWorkResult {
Expand Down
9 changes: 7 additions & 2 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,14 @@ impl BallistaClient {
job_id: &str,
stage_id: usize,
partition_id: usize,
path: &str,
) -> Result<SendableRecordBatchStream> {
let action =
Action::FetchPartition(PartitionId::new(job_id, stage_id, partition_id));
let action = Action::FetchPartition {
job_id: job_id.to_string(),
stage_id,
partition_id,
path: path.to_owned(),
};
self.execute_action(&action).await
}

Expand Down
18 changes: 16 additions & 2 deletions ballista/rust/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::client::BallistaClient;
use crate::memory_stream::MemoryStream;
use crate::serde::scheduler::PartitionLocation;

use crate::error::BallistaError;
use crate::utils::WrappedStream;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -96,7 +97,11 @@ impl ExecutionPlan for ShuffleReaderExec {
&self,
partition: usize,
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
info!("ShuffleReaderExec::execute({})", partition);
info!(
"ShuffleReaderExec::execute({}) fetching {} shuffle partitions",
partition,
self.partition.len()
);

let start = Instant::now();
let partition_locations = &self.partition[partition];
Expand Down Expand Up @@ -127,11 +132,12 @@ impl ExecutionPlan for ShuffleReaderExec {
x.iter()
.map(|l| {
format!(
"[executor={} part={}:{}:{} stats={}]",
"[executor={} part={}:{}:{} file={} stats={}]",
l.executor_meta.id,
l.partition_id.job_id,
l.partition_id.stage_id,
l.partition_id.partition_id,
l.path,
l.partition_stats
)
})
Expand All @@ -157,6 +163,13 @@ async fn fetch_partition(
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
let metadata = &location.executor_meta;
let partition_id = &location.partition_id;

// if partition_id.path.is_empty() {
// return Err(DataFusionError::Internal(
// "Can fetch partition without path".to_string(),
// ));
// }

let mut ballista_client =
BallistaClient::try_new(metadata.host.as_str(), metadata.port as u16)
.await
Expand All @@ -166,6 +179,7 @@ async fn fetch_partition(
&partition_id.job_id,
partition_id.stage_id as usize,
partition_id.partition_id as usize,
&location.path,
)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?)
Expand Down
Loading