Skip to content

Commit

Permalink
Merge branch 'main' of github.com:MaterializeInc/materialize into pgw…
Browse files Browse the repository at this point in the history
…ire-extended
  • Loading branch information
jkosh44 committed Aug 2, 2022
2 parents bd13644 + 6d7b623 commit cc9e446
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 14 deletions.
29 changes: 19 additions & 10 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,13 +489,13 @@ impl CatalogState {
id: ComputeInstanceId,
name: String,
introspection: Option<ComputeInstanceIntrospectionConfig>,
introspection_sources: Vec<(&'static BuiltinLog, GlobalId)>,
introspection_source_indexes: Vec<(&'static BuiltinLog, GlobalId)>,
) {
let logging = match introspection {
None => None,
Some(introspection) => {
let mut active_logs = BTreeMap::new();
for (log, index_id) in introspection_sources {
for (log, index_id) in introspection_source_indexes {
let source_name = FullObjectName {
database: RawDatabaseSpecifier::Ambient,
schema: log.schema.into(),
Expand Down Expand Up @@ -3025,7 +3025,7 @@ impl<S: Append> Catalog<S> {
name: String,
config: Option<ComputeInstanceIntrospectionConfig>,
// These are the legacy, active logs of this compute instance
introspection_sources: Vec<(&'static BuiltinLog, GlobalId)>,
introspection_source_indexes: Vec<(&'static BuiltinLog, GlobalId)>,
},
CreateComputeInstanceReplica {
id: ReplicaId,
Expand Down Expand Up @@ -3165,14 +3165,15 @@ impl<S: Append> Catalog<S> {
Op::CreateComputeInstance {
name,
config,
introspection_sources,
introspection_source_indexes,
} => {
if is_reserved_name(&name) {
return Err(AdapterError::Catalog(Error::new(
ErrorKind::ReservedClusterName(name),
)));
}
let id = tx.insert_compute_instance(&name, &config, &introspection_sources)?;
let id =
tx.insert_compute_instance(&name, &config, &introspection_source_indexes)?;
self.add_to_audit_log(
session,
&mut tx,
Expand All @@ -3185,7 +3186,7 @@ impl<S: Append> Catalog<S> {
id,
name,
config,
introspection_sources,
introspection_source_indexes,
}]
}
Op::CreateComputeInstanceReplica {
Expand Down Expand Up @@ -3595,13 +3596,21 @@ impl<S: Append> Catalog<S> {
id,
name,
config,
introspection_sources,
introspection_source_indexes,
} => {
info!("create cluster {}", name);
let introspection_source_index_ids: Vec<GlobalId> =
introspection_sources.iter().map(|(_, id)| *id).collect();
introspection_source_indexes
.iter()
.map(|(_, id)| *id)
.collect();
state
.insert_compute_instance(id, name.clone(), config, introspection_sources)
.insert_compute_instance(
id,
name.clone(),
config,
introspection_source_indexes,
)
.await;
builtin_table_updates.push(state.pack_compute_instance_update(&name, 1));
for id in introspection_source_index_ids {
Expand Down Expand Up @@ -4030,7 +4039,7 @@ pub enum Op {
CreateComputeInstance {
name: String,
config: Option<ComputeInstanceIntrospectionConfig>,
introspection_sources: Vec<(&'static BuiltinLog, GlobalId)>,
introspection_source_indexes: Vec<(&'static BuiltinLog, GlobalId)>,
},
CreateComputeInstanceReplica {
name: String,
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/src/catalog/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ impl<'a, S: Append> Transaction<'a, S> {
&mut self,
cluster_name: &str,
config: &Option<ComputeInstanceIntrospectionConfig>,
introspection_sources: &Vec<(&'static BuiltinLog, GlobalId)>,
introspection_source_indexes: &Vec<(&'static BuiltinLog, GlobalId)>,
) -> Result<ComputeInstanceId, Error> {
let id = self.get_and_increment_id(COMPUTE_ID_ALLOC_KEY.to_string())?;
let config = serde_json::to_string(config)
Expand All @@ -935,7 +935,7 @@ impl<'a, S: Append> Transaction<'a, S> {
)));
};

for (builtin, index_id) in introspection_sources {
for (builtin, index_id) in introspection_source_indexes {
let index_id = if let GlobalId::System(id) = index_id {
*id
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ use chrono::{DateTime, Utc};
use derivative::Derivative;
use futures::StreamExt;
use itertools::Itertools;
use mz_ore::tracing::OpenTelemetryContext;
use rand::seq::SliceRandom;
use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
use tokio::runtime::Handle as TokioHandle;
Expand Down Expand Up @@ -183,6 +184,7 @@ pub struct CreateSourceStatementReady {
pub params: Params,
pub depends_on: Vec<GlobalId>,
pub original_stmt: Statement<Raw>,
pub otel_ctx: OpenTelemetryContext,
}

#[derive(Derivative)]
Expand Down
3 changes: 3 additions & 0 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

use std::sync::Arc;

use mz_ore::tracing::OpenTelemetryContext;
use rand::Rng;
use tokio::sync::{oneshot, watch};
use tracing::Instrument;
Expand Down Expand Up @@ -419,6 +420,7 @@ impl<S: Append + 'static> Coordinator<S> {
stmt,
self.connection_context.clone(),
);
let otel_ctx = OpenTelemetryContext::obtain();
task::spawn(|| format!("purify:{conn_id}"), async move {
let result = purify_fut.await.map_err(|e| e.into());
internal_cmd_tx
Expand All @@ -430,6 +432,7 @@ impl<S: Append + 'static> Coordinator<S> {
params,
depends_on,
original_stmt,
otel_ctx,
},
))
.expect("sending to internal_cmd_tx cannot fail");
Expand Down
3 changes: 3 additions & 0 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,11 @@ impl<S: Append + 'static> Coordinator<S> {
params,
depends_on,
original_stmt,
otel_ctx,
}: CreateSourceStatementReady,
) {
otel_ctx.attach_as_parent();

let stmt = match result {
Ok(stmt) => stmt,
Err(e) => return tx.send(Err(e), session),
Expand Down
16 changes: 14 additions & 2 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,15 +579,19 @@ impl<S: Append + 'static> Coordinator<S> {
}: CreateComputeInstancePlan,
) -> Result<ExecuteResponse, AdapterError> {
tracing::debug!("sequence_create_compute_instance");
let introspection_sources = if compute_instance_config.is_some() {
let introspection_source_indexes = if compute_instance_config.is_some() {
self.catalog.allocate_introspection_source_indexes().await
} else {
Vec::new()
};
let introspection_source_index_ids: Vec<_> = introspection_source_indexes
.iter()
.map(|(_, id)| *id)
.collect();
let mut ops = vec![catalog::Op::CreateComputeInstance {
name: name.clone(),
config: compute_instance_config.clone(),
introspection_sources,
introspection_source_indexes,
}];

let azs = self.catalog.state().availability_zones();
Expand Down Expand Up @@ -690,6 +694,14 @@ impl<S: Append + 'static> Coordinator<S> {
.unwrap();
}

if !introspection_source_index_ids.is_empty() {
self.initialize_compute_read_policies(
introspection_source_index_ids,
instance.id,
DEFAULT_LOGICAL_COMPACTION_WINDOW_MS,
)
.await;
}
if !introspection_collection_ids.is_empty() {
self.initialize_storage_read_policies(
introspection_collection_ids,
Expand Down
10 changes: 10 additions & 0 deletions src/persist/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@ impl PostgresConsensus {
// The lock ID was randomly generated.
tx.batch_execute("SELECT pg_advisory_xact_lock(135664303235462630);")
.await?;
} else {
// The `consensus` table creates and deletes rows at a high frequency, generating many
// tombstoned rows. If Cockroach's GC interval is set high (the default is 25h) and
// these tombstones accumulate, scanning over the table will take increasingly and
// prohibitively long.
//
// See: https://github.com/MaterializeInc/materialize/issues/13975
// See: https://www.cockroachlabs.com/docs/stable/configure-zone.html#variables
tx.batch_execute("ALTER TABLE consensus CONFIGURE ZONE USING gc.ttlseconds = 600;")
.await?;
}
tx.batch_execute(SCHEMA).await?;
tx.commit().await?;
Expand Down
29 changes: 29 additions & 0 deletions test/sqllogictest/cluster_log_compaction.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

# Ensure correct log creation on `CREATE CLUSTER` initializes read policies for introspection source indexes.

mode cockroach

statement ok
CREATE CLUSTER c1 REPLICAS (r (SIZE '1'));

statement ok
SET CLUSTER TO c1;

statement ok
BEGIN

# Transaction will force a read hold on this index.
query T rowsort
SELECT * FROM mz_arrangement_batches_internal_2;
----

statement ok
COMMIT

0 comments on commit cc9e446

Please sign in to comment.