Skip to content

Commit

Permalink
Merge pull request MaterializeInc#24072 from aalexandrov/issue_16217_…
Browse files Browse the repository at this point in the history
…create_index

Move `CREATE INDEX` optimization off the coordinator thread
  • Loading branch information
aalexandrov authored Dec 22, 2023
2 parents d36fdfb + 41ea92b commit 95e524f
Show file tree
Hide file tree
Showing 8 changed files with 388 additions and 187 deletions.
46 changes: 46 additions & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ pub enum Message<T = mz_repr::Timestamp> {
otel_ctx: OpenTelemetryContext,
stage: PeekStage,
},
CreateIndexStageReady {
ctx: ExecuteContext,
otel_ctx: OpenTelemetryContext,
stage: CreateIndexStage,
},
CreateViewStageReady {
ctx: ExecuteContext,
otel_ctx: OpenTelemetryContext,
Expand Down Expand Up @@ -286,6 +291,7 @@ impl Message {
"execute_single_statement_transaction"
}
Message::PeekStageReady { .. } => "peek_stage_ready",
Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
Message::CreateViewStageReady { .. } => "create_view_stage_ready",
Message::CreateMaterializedViewStageReady { .. } => {
"create_materialized_view_stage_ready"
Expand Down Expand Up @@ -446,6 +452,46 @@ pub struct PeekStageFinish {
global_mir_plan: optimize::peek::GlobalMirPlan,
}

#[derive(Debug)]
pub enum CreateIndexStage {
Validate(CreateIndexValidate),
Optimize(CreateIndexOptimize),
Finish(CreateIndexFinish),
}

impl CreateIndexStage {
fn validity(&mut self) -> Option<&mut PlanValidity> {
match self {
Self::Validate(_) => None,
Self::Optimize(stage) => Some(&mut stage.validity),
Self::Finish(stage) => Some(&mut stage.validity),
}
}
}

#[derive(Debug)]
pub struct CreateIndexValidate {
plan: plan::CreateIndexPlan,
resolved_ids: ResolvedIds,
}

#[derive(Debug)]
pub struct CreateIndexOptimize {
validity: PlanValidity,
plan: plan::CreateIndexPlan,
resolved_ids: ResolvedIds,
}

#[derive(Debug)]
pub struct CreateIndexFinish {
validity: PlanValidity,
id: GlobalId,
plan: plan::CreateIndexPlan,
resolved_ids: ResolvedIds,
global_mir_plan: optimize::index::GlobalMirPlan,
global_lir_plan: optimize::index::GlobalLirPlan,
}

#[derive(Debug)]
pub enum CreateViewStage {
Validate(CreateViewValidate),
Expand Down
8 changes: 8 additions & 0 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ impl Coordinator {
otel_ctx.attach_as_parent();
self.sequence_peek_stage(ctx, otel_ctx, stage).await;
}
Message::CreateIndexStageReady {
ctx,
otel_ctx,
stage,
} => {
otel_ctx.attach_as_parent();
self.sequence_create_index_stage(ctx, stage, otel_ctx).await;
}
Message::CreateViewStageReady {
ctx,
otel_ctx,
Expand Down
5 changes: 1 addition & 4 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,7 @@ impl Coordinator {
.await;
}
Plan::CreateIndex(plan) => {
let result = self
.sequence_create_index(ctx.session_mut(), plan, resolved_ids)
.await;
ctx.retire(result);
self.sequence_create_index(ctx, plan, resolved_ids).await;
}
Plan::CreateType(plan) => {
let result = self
Expand Down
142 changes: 3 additions & 139 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,14 @@ use crate::error::AdapterError;
use crate::explain::explain_dataflow;
use crate::explain::optimizer_trace::OptimizerTrace;
use crate::notice::{AdapterNotice, DroppedInUseIndex};
use crate::optimize::dataflows::{
dataflow_import_id_bundle, prep_scalar_expr, EvalTime, ExprPrepStyle,
};
use crate::optimize::dataflows::{prep_scalar_expr, EvalTime, ExprPrepStyle};
use crate::optimize::{self, Optimize, OptimizerConfig};
use crate::session::{EndTransactionAction, Session, TransactionOps, TransactionStatus, WriteOp};
use crate::subscribe::ActiveSubscribe;
use crate::util::{viewable_variables, ClientTransmitter, ComputeSinkId, ResultExt};
use crate::{guard_write_critical_section, PeekResponseUnary, TimestampExplanation};

mod create_index;
mod create_materialized_view;
mod create_view;

Expand Down Expand Up @@ -856,141 +855,6 @@ impl Coordinator {
}
}

#[tracing::instrument(level = "debug", skip(self))]
pub(super) async fn sequence_create_index(
&mut self,
session: &mut Session,
plan: plan::CreateIndexPlan,
resolved_ids: ResolvedIds,
) -> Result<ExecuteResponse, AdapterError> {
let plan::CreateIndexPlan {
name,
index:
plan::Index {
create_sql,
on,
keys,
cluster_id,
},
options,
if_not_exists,
} = plan;

self.ensure_cluster_can_host_compute_item(&name, cluster_id)?;

// Collect optimizer parameters.
let compute_instance = self
.instance_snapshot(cluster_id)
.expect("compute instance does not exist");
let id = self.catalog_mut().allocate_user_id().await?;
let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());

// Build an optimizer for this INDEX.
let mut optimizer = optimize::index::Optimizer::new(
self.owned_catalog(),
compute_instance,
id,
optimizer_config,
);

// MIR ⇒ MIR optimization (global)
let index_plan = optimize::index::Index::new(&name, &on, &keys);
let global_mir_plan = optimizer.optimize(index_plan)?;
// MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
let global_lir_plan = optimizer.optimize(global_mir_plan.clone())?;

let index = mz_catalog::memory::objects::Index {
create_sql,
keys,
on,
conn_id: None,
resolved_ids,
cluster_id,
is_retained_metrics_object: false,
custom_logical_compaction_window: None,
};

let oid = self.catalog_mut().allocate_oid()?;
let on = self.catalog().get_entry(&index.on);
// Indexes have the same owner as their parent relation.
let owner_id = *on.owner_id();
let op = catalog::Op::CreateItem {
id,
oid,
name: name.clone(),
item: CatalogItem::Index(index),
owner_id,
};

let transact_result = self
.catalog_transact_with_side_effects(Some(session), vec![op], |coord| async {
// Save plan structures.
coord
.catalog_mut()
.set_optimized_plan(id, global_mir_plan.df_desc().clone());
coord
.catalog_mut()
.set_physical_plan(id, global_lir_plan.df_desc().clone());

let (mut df_desc, df_meta) = global_lir_plan.unapply();

// Timestamp selection
let id_bundle = dataflow_import_id_bundle(&df_desc, cluster_id);
let since = coord.least_valid_read(&id_bundle);
df_desc.set_as_of(since);

// Emit notices.
coord.emit_optimizer_notices(session, &df_meta.optimizer_notices);

// Notices rendering
let df_meta = coord.catalog().render_notices(df_meta, Some(id));
coord
.catalog_mut()
.set_dataflow_metainfo(id, df_meta.clone());

if coord.catalog().state().system_config().enable_mz_notices() {
// Initialize a container for builtin table updates.
let mut builtin_table_updates =
Vec::with_capacity(df_meta.optimizer_notices.len());
// Collect optimization hint updates.
coord.catalog().pack_optimizer_notices(
&mut builtin_table_updates,
df_meta.optimizer_notices.iter(),
1,
);
// Write collected optimization hints to the builtin tables.
let builtin_updates_fut = coord
.builtin_table_update()
.execute(builtin_table_updates)
.await;

let ship_dataflow_fut = coord.ship_dataflow(df_desc, cluster_id);

futures::future::join(builtin_updates_fut, ship_dataflow_fut).await;
} else {
coord.ship_dataflow(df_desc, cluster_id).await;
}

coord.set_index_options(id, options).expect("index enabled");
})
.await;

match transact_result {
Ok(_) => Ok(ExecuteResponse::CreatedIndex),
Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
kind:
mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
})) if if_not_exists => {
session.add_notice(AdapterNotice::ObjectAlreadyExists {
name: name.item,
ty: "index",
});
Ok(ExecuteResponse::CreatedIndex)
}
Err(err) => Err(err),
}
}

#[tracing::instrument(level = "debug", skip(self))]
pub(super) async fn sequence_create_type(
&mut self,
Expand Down Expand Up @@ -1951,7 +1815,7 @@ impl Coordinator {
.await;
}

/// Processes as many peek stages as possible.
/// Processes as many `peek` stages as possible.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn sequence_peek_stage(
&mut self,
Expand Down
Loading

0 comments on commit 95e524f

Please sign in to comment.