Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: avoid returning metrics in non-analyze sql #1410

Merged
merged 3 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = { git = "https://github.com/CeresDB/horaedbproto.git", rev = "d849fa4" }
ceresdbproto = { git = "https://github.com/CeresDB/horaedbproto.git", rev = "cfdaccc" }
codec = { path = "components/codec" }
chrono = "0.4"
clap = "3.0"
Expand Down
8 changes: 7 additions & 1 deletion df_engine_extensions/src/dist_sql_query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,19 @@ type ExecutableScanBuilderRef = Box<dyn ExecutableScanBuilder>;
pub struct RemoteTaskContext {
pub task_ctx: Arc<TaskContext>,
pub remote_metrics: Arc<Mutex<Option<String>>>,
pub is_analyze: bool,
}

impl RemoteTaskContext {
pub fn new(task_ctx: Arc<TaskContext>, remote_metrics: Arc<Mutex<Option<String>>>) -> Self {
pub fn new(
task_ctx: Arc<TaskContext>,
remote_metrics: Arc<Mutex<Option<String>>>,
is_analyze: bool,
) -> Self {
Self {
task_ctx,
remote_metrics,
is_analyze,
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion df_engine_extensions/src/dist_sql_query/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ impl ExecutionPlan for ResolvedPartitionedScan {
remote_metrics,
} = &self.remote_exec_ctx.plan_ctxs[partition];

let remote_task_ctx = RemoteTaskContext::new(context, remote_metrics.clone());
let remote_task_ctx =
RemoteTaskContext::new(context, remote_metrics.clone(), self.is_analyze);

// Send plan for remote execution.
let stream_future = self.remote_exec_ctx.executor.execute(
Expand Down
1 change: 1 addition & 0 deletions query_engine/src/datafusion_impl/task_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ impl RemotePhysicalPlanExecutor for RemotePhysicalPlanExecutorImpl {
default_schema,
query: display_plan.indent(true).to_string(),
priority,
is_analyze: task_context.is_analyze,
};

// Encode plan and schema
Expand Down
59 changes: 33 additions & 26 deletions server/src/grpc/remote_engine_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,17 +220,17 @@ impl<M: MetricCollector> Drop for StreamWithMetric<M> {

struct RemoteExecStream {
inner: BoxStream<'static, Result<RecordBatch>>,
physical_plan: Option<PhysicalPlanRef>,
physical_plan_for_explain: Option<PhysicalPlanRef>,
}

impl RemoteExecStream {
fn new(
inner: BoxStream<'static, Result<RecordBatch>>,
physical_plan: Option<PhysicalPlanRef>,
physical_plan_for_explain: Option<PhysicalPlanRef>,
) -> Self {
Self {
inner,
physical_plan,
physical_plan_for_explain,
}
}
}
Expand All @@ -240,19 +240,25 @@ impl Stream for RemoteExecStream {

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match this.inner.poll_next_unpin(cx) {
Poll::Ready(Some(res)) => {
Poll::Ready(Some(res.map(RecordBatchWithMetric::RecordBatch)))
}
Poll::Ready(None) => match &this.physical_plan {
Some(physical_plan) => {
let metrics = physical_plan.metrics_to_string();
this.physical_plan = None;
Poll::Ready(Some(Ok(RecordBatchWithMetric::Metric(metrics))))
let is_explain = this.physical_plan_for_explain.is_some();
loop {
match this.inner.poll_next_unpin(cx) {
Poll::Ready(Some(res)) => {
// If the request is explain, we try drain the stream to get the metrics.
if !is_explain {
return Poll::Ready(Some(res.map(RecordBatchWithMetric::RecordBatch)));
}
}
None => Poll::Ready(None),
},
Poll::Pending => Poll::Pending,
Poll::Ready(None) => match &this.physical_plan_for_explain {
Some(physical_plan) => {
let metrics = physical_plan.metrics_to_string();
this.physical_plan_for_explain = None;
return Poll::Ready(Some(Ok(RecordBatchWithMetric::Metric(metrics))));
}
None => return Poll::Ready(None),
},
Poll::Pending => return Poll::Pending,
}
}
}
}
Expand Down Expand Up @@ -715,15 +721,16 @@ impl RemoteEngineServiceImpl {
slow_threshold_secs,
query_ctx.priority,
);
let physical_plan = Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
encoded_plan,
)));
let physical_plan: PhysicalPlanRef = Arc::new(DataFusionPhysicalPlanAdapter::new(
TypedPlan::Remote(encoded_plan),
));
// TODO: Use in handle_execute_plan fn to build stream with metrics
let physical_plan_for_explain = ctx.explain.map(|_| physical_plan.clone());

let rt = self
.runtimes
.read_runtime
.choose_runtime(&query_ctx.priority);
let physical_plan_clone = physical_plan.clone();

let stream = rt
.spawn(async move { handle_execute_plan(query_ctx, physical_plan, query_engine).await })
Expand All @@ -743,7 +750,7 @@ impl RemoteEngineServiceImpl {
let stream = StreamWithMetric::new(Box::pin(stream), metric);
Ok(RemoteExecStream::new(
Box::pin(stream),
Some(physical_plan_clone),
physical_plan_for_explain,
))
}

Expand Down Expand Up @@ -778,11 +785,11 @@ impl RemoteEngineServiceImpl {
encoded_plan: encoded_plan.clone(),
};

let physical_plan = Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
encoded_plan,
)));

let physical_plan_clone = physical_plan.clone();
let physical_plan: PhysicalPlanRef = Arc::new(DataFusionPhysicalPlanAdapter::new(
TypedPlan::Remote(encoded_plan),
));
// TODO: Use in handle_execute_plan fn to build stream with metrics
let physical_plan_for_explain = ctx.explain.map(|_| physical_plan.clone());

let QueryDedup {
config,
Expand Down Expand Up @@ -822,7 +829,7 @@ impl RemoteEngineServiceImpl {
let stream = StreamWithMetric::new(Box::pin(ReceiverStream::new(rx)), metric);
Ok(RemoteExecStream::new(
Box::pin(stream),
Some(physical_plan_clone),
physical_plan_for_explain,
))
}

Expand Down
12 changes: 12 additions & 0 deletions table_engine/src/remote/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,9 @@ pub struct ExecContext {
pub default_schema: String,
pub query: String,
pub priority: Priority,
// TOOO: there are many explain types, we need to support them all.
// A proper way is to define a enum for all explain types.
pub is_analyze: bool,
}

pub enum PhysicalPlan {
Expand All @@ -470,6 +473,11 @@ impl From<RemoteExecuteRequest> for ceresdbproto::remote_engine::ExecutePlanRequ
NO_TIMEOUT
};

let explain = if value.context.is_analyze {
Some(ceresdbproto::remote_engine::Explain::Analyze)
} else {
None
};
let pb_context = ceresdbproto::remote_engine::ExecContext {
request_id: 0, // not used any more
request_id_str: value.context.request_id.to_string(),
Expand All @@ -478,6 +486,7 @@ impl From<RemoteExecuteRequest> for ceresdbproto::remote_engine::ExecutePlanRequ
timeout_ms: rest_duration_ms,
priority: value.context.priority.as_u8() as i32,
displayable_query: value.context.query,
explain: explain.map(|v| v as i32),
};

let pb_plan = match value.physical_plan {
Expand Down Expand Up @@ -522,8 +531,10 @@ impl TryFrom<ceresdbproto::remote_engine::ExecutePlanRequest> for RemoteExecuteR
default_schema,
timeout_ms,
displayable_query,
explain,
..
} = pb_exec_ctx;
let is_analyze = explain == Some(ceresdbproto::remote_engine::Explain::Analyze as i32);

let request_id = RequestId::from(request_id_str);
let deadline = if timeout_ms >= 0 {
Expand All @@ -539,6 +550,7 @@ impl TryFrom<ceresdbproto::remote_engine::ExecutePlanRequest> for RemoteExecuteR
default_schema,
query: displayable_query,
priority,
is_analyze,
};

// Plan
Expand Down
Loading