Skip to content

Commit

Permalink
Implement a stream recv_many to avoid potential starvation issue; add…
Browse files Browse the repository at this point in the history
…ress other PR feedback
  • Loading branch information
rjobanp committed Dec 5, 2023
1 parent 72d202d commit daa77c2
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 64 deletions.
6 changes: 3 additions & 3 deletions doc/user/content/sql/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -735,14 +735,14 @@ messages and additional metadata helpful for debugging.
### `mz_aws_privatelink_connection_status_history`

The `mz_aws_privatelink_connection_status_history` table contains a row describing
the historical status for each PrivateLink connection.
the historical status for each AWS PrivateLink connection in the system.

<!-- RELATION_SPEC mz_internal.mz_aws_privatelink_connection_status_history -->
| Field | Type | Meaning |
|-------------------|----------------------------|------------------------------------------------------------|
| `occurred_at` | `timestamp with time zone` | Wall-clock timestamp of the status change. |
| `connection_id` | `text` | The unique identifier of the AWS PrivateLink connection. Corresponds to [`mz_catalog.mz_connections.id`](../mz_catalog#mz_connections) |
| `status` | `text` | The status: one of `pending-service-discovery`, `creating-endpoint`, `recreating-endpoint`, `updating-endpoint`, `available`, `deleted`, `deleting`, `expired`, `failed`, `pending`, `pending-acceptance`, `rejected`, `unknown` |
| `connection_id` | `text` | The unique identifier of the AWS PrivateLink connection. Corresponds to [`mz_catalog.mz_connections.id`](../mz_catalog#mz_connections). |
| `status` | `text` | The status of the connection: one of `pending-service-discovery`, `creating-endpoint`, `recreating-endpoint`, `updating-endpoint`, `available`, `deleted`, `deleting`, `expired`, `failed`, `pending`, `pending-acceptance`, `rejected`, or `unknown`. |


<!--
Expand Down
25 changes: 25 additions & 0 deletions misc/cargo-vet/audits.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,31 @@ who = "Moritz Hoffmann <mh@materialize.com>"
criteria = "safe-to-deploy"
version = "0.12.0@git:fca940b40e7827a9a639f99af45806100ef14954"

[[audits.futures-timer]]
who = "Roshan Jobanputra <roshan@materialize.com>"
criteria = "safe-to-deploy"
version = "3.0.2"

[[audits.governor]]
who = "Roshan Jobanputra <roshan@materialize.com>"
criteria = "safe-to-deploy"
version = "0.6.0"

[[audits.nonzero_ext]]
who = "Roshan Jobanputra <roshan@materialize.com>"
criteria = "safe-to-deploy"
version = "0.3.0"

[[audits.quanta]]
who = "Roshan Jobanputra <roshan@materialize.com>"
criteria = "safe-to-deploy"
version = "0.11.1"

[[audits.raw-cpuid]]
who = "Roshan Jobanputra <roshan@materialize.com>"
criteria = "safe-to-deploy"
version = "10.7.0"

[[audits.timely]]
who = "Moritz Hoffmann <mh@materialize.com>"
criteria = "safe-to-deploy"
Expand Down
17 changes: 17 additions & 0 deletions misc/cargo-vet/imports.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,17 @@ criteria = "safe-to-deploy"
version = "1.0.1"
notes = "No unsafe usage or ambient capabilities"

[[audits.embark-studios.audits.no-std-compat]]
who = "Johan Andersson <opensource@embark-studios.com>"
criteria = "safe-to-deploy"
version = "0.2.0"
notes = "No unsafe usage or ambient capabilities"

[[audits.embark-studios.audits.no-std-compat]]
who = "Johan Andersson <opensource@embark-studios.com>"
criteria = "safe-to-deploy"
delta = "0.2.0 -> 0.4.1"

[[audits.embark-studios.audits.similar]]
who = "Johan Andersson <opensource@embark-studios.com>"
criteria = "safe-to-deploy"
Expand Down Expand Up @@ -1442,6 +1453,12 @@ criteria = "safe-to-deploy"
version = "0.4.17"
aggregated-from = "https://hg.mozilla.org/mozilla-central/raw-file/tip/supply-chain/audits.toml"

[[audits.mozilla.audits.mach2]]
who = "Gabriele Svelto <gsvelto@mozilla.com>"
criteria = "safe-to-deploy"
version = "0.4.1"
aggregated-from = "https://hg.mozilla.org/mozilla-central/raw-file/tip/supply-chain/audits.toml"

[[audits.mozilla.audits.matches]]
who = "Bobby Holley <bobbyholley@gmail.com>"
criteria = "safe-to-deploy"
Expand Down
28 changes: 12 additions & 16 deletions src/adapter/src/coord/privatelink_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
use std::num::NonZeroU32;
use std::{collections::BTreeMap, sync::Arc};

use futures::stream::StreamExt;
use futures::FutureExt;
use governor::{Quota, RateLimiter};

use mz_cloud_resources::VpcEndpointEvent;
use mz_ore::future::OreStreamExt;
use mz_ore::task::spawn;
use mz_repr::{Datum, GlobalId, Row};
use mz_storage_client::controller::IntrospectionType;
Expand Down Expand Up @@ -50,35 +49,32 @@ impl Coordinator {
));

loop {
// Wait for an event to become available
if let Some(next_event) = stream.next().await {
// Wait until we're permitted to tell the coordinator about the event
// Wait for events to become available
if let Some(new_events) = stream.recv_many(20).await {
// Wait until we're permitted to tell the coordinator about the events
// Note that the stream is backed by a https://docs.rs/kube/latest/kube/runtime/fn.watcher.html,
// which means its safe for us to rate limit for an arbitrarily long time and expect the stream
// to continue to work, despite not being polled
rate_limiter.until_ready().await;

// Events to be written, de-duped by connection_id
let mut events = BTreeMap::new();
let mut event_map = BTreeMap::new();

let mut add_event = |event: VpcEndpointEvent| {
for event in new_events {
match last_written_events.get(&event.connection_id) {
// Ignore if an event with this time was already written
Some(time) if time >= &event.time => {}
_ => {
last_written_events
.insert(event.connection_id.clone(), event.time.clone());
events.insert(event.connection_id, event);
event_map.insert(event.connection_id, event);
}
}
};

add_event(next_event);
// Drain any additional events from the queue that accumulated while
// waiting for the rate limiter
while let Some(event) = stream.next().now_or_never().and_then(|e| e) {
add_event(event);
}

// Send the event batch to the coordinator to be written
let _ = internal_cmd_tx.send(Message::PrivateLinkVpcEndpointEvents(events));
let _ =
internal_cmd_tx.send(Message::PrivateLinkVpcEndpointEvents(event_map));
}
}
});
Expand Down
67 changes: 30 additions & 37 deletions src/orchestrator-kubernetes/src/cloud_resource_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,45 +112,38 @@ impl CloudResourceController for KubernetesOrchestrator {
.filter_map(|object| async move {
match object {
Ok(vpce) => {
if let Some(connection_id) =
mz_cloud_resources::id_from_vpc_endpoint_name(&vpce.name_any())
let connection_id =
mz_cloud_resources::id_from_vpc_endpoint_name(&vpce.name_any())?;

if let Some(state) = vpce.status.as_ref().and_then(|st| st.state.to_owned())
{
if let Some(state) =
vpce.status.as_ref().and_then(|st| st.state.to_owned())
{
return Some(VpcEndpointEvent {
connection_id,
status: state,
// Use the 'Available' Condition on the VPCE Status to set the event-time, falling back
// to now if it's not set
time: vpce
.status
.unwrap()
.conditions
.and_then(|c| {
c.into_iter().find(|c| &c.type_ == "Available")
})
.and_then(|condition| {
Some(condition.last_transition_time.0)
})
.unwrap_or_else(Utc::now),
});
} else {
// The Status/State is not yet populated on the VpcEndpoint, which means it was just
// initialized and hasn't yet been reconciled by the environment-controller
// We return an event with an 'unknown' state so that watchers know the VpcEndpoint was created
// even if we don't yet have an accurate status
return Some(VpcEndpointEvent {
connection_id,
status: VpcEndpointState::Unknown,
time: vpce.creation_timestamp().expect("Will exist").0,
});
}
// TODO: Should we also check for the deletion_timestamp on the vpce? That would indicate that the
// resource is about to be deleted; however there is already a 'deleted' enum val on VpcEndpointState
// which refers to the state of the customer's VPC Endpoint Service, so we'd need to introduce a new state val
Some(VpcEndpointEvent {
connection_id,
status: state,
// Use the 'Available' Condition on the VPCE Status to set the event-time, falling back
// to now if it's not set
time: vpce
.status
.unwrap()
.conditions
.and_then(|c| c.into_iter().find(|c| &c.type_ == "Available"))
.and_then(|condition| Some(condition.last_transition_time.0))
.unwrap_or_else(Utc::now),
})
} else {
// The Status/State is not yet populated on the VpcEndpoint, which means it was just
// initialized and hasn't yet been reconciled by the environment-controller
// We return an event with an 'unknown' state so that watchers know the VpcEndpoint was created
// even if we don't yet have an accurate status
Some(VpcEndpointEvent {
connection_id,
status: VpcEndpointState::Unknown,
time: vpce.creation_timestamp()?.0,
})
}
None
// TODO: Should we also check for the deletion_timestamp on the vpce? That would indicate that the
// resource is about to be deleted; however there is already a 'deleted' enum val on VpcEndpointState
// which refers to the state of the customer's VPC Endpoint Service, so we'd need to introduce a new state val
}
Err(error) => {
// We assume that errors returned by Kubernetes are usually transient, so we
Expand Down
49 changes: 49 additions & 0 deletions src/ore/src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ use std::panic::UnwindSafe;
use std::pin::Pin;
use std::task::{Context, Poll};

use async_trait::async_trait;
use futures::future::{CatchUnwind, FutureExt};
use futures::sink::Sink;
use futures::Stream;
use pin_project::pin_project;
use tokio::task::futures::TaskLocalFuture;
use tokio::time::{self, Duration, Instant};
Expand Down Expand Up @@ -375,3 +377,50 @@ impl<T, E> Sink<T> for DevNull<T, E> {
Poll::Ready(Ok(()))
}
}

/// Extension methods for streams.
#[async_trait]
pub trait OreStreamExt: Stream {
/// Awaits the stream for an event to be available and returns all currently buffered
/// events on the stream up to some `max`.
///
/// This method returns `None` if the stream has ended.
///
/// If there are no events ready on the stream this method will sleep until an event is
/// sent or the stream is closed. When woken it will return up to `max` currently buffered
/// events.
///
/// # Cancel safety
///
/// This method is cancel safe. If `recv_many` is used as the event in a `select!` statement
/// and some other branch completes first, it is guaranteed that no messages were received on
/// this channel.
async fn recv_many(&mut self, max: usize) -> Option<Vec<Self::Item>>;
}

#[async_trait]
impl<T> OreStreamExt for T
where
T: futures::stream::Stream + futures::StreamExt + Send + Unpin,
{
async fn recv_many(&mut self, max: usize) -> Option<Vec<Self::Item>> {
// Wait for an event to be ready on the stream
let first = self.next().await?;
let mut buffer = Vec::from([first]);

// Note(parkmycar): It's very important for cancelation safety that we don't add any more
// .await points other than the initial one.

// Pull all other ready events off the stream, up to the max
while let Some(v) = self.next().now_or_never().and_then(|e| e) {
buffer.push(v);

// Break so we don't loop here continuously.
if buffer.len() >= max {
break;
}
}

Some(buffer)
}
}
45 changes: 37 additions & 8 deletions src/storage-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,11 +924,34 @@ where
.await;
}
IntrospectionType::PrivatelinkConnectionStatusHistory => {
// Truncate the private link connection status history table and
// store the latest timestamp for each id in the table.
let occurred_at_col =
healthcheck::MZ_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC
.get_by_name(&ColumnName::from("occurred_at"))
.expect("schema has not changed")
.0;
self.privatelink_status_table_latest = self
.partially_truncate_status_history(
IntrospectionType::PrivatelinkConnectionStatusHistory,
)
.await;
.await
.and_then(|map| {
Some(
map.into_iter()
.map(|(id, row)| {
(
id,
row.iter()
.nth(occurred_at_col)
.expect("schema has not changed")
.unwrap_timestamptz()
.into(),
)
})
.collect(),
)
});
}

// Truncate compute-maintained collections.
Expand Down Expand Up @@ -2628,11 +2651,11 @@ where

/// Effectively truncates the source status history shard except for the most recent updates
/// from each ID.
/// Returns a map with the timestamp of the latest row per id
/// Returns a map with latest row per id
async fn partially_truncate_status_history(
&mut self,
collection: IntrospectionType,
) -> Option<BTreeMap<GlobalId, DateTime<Utc>>> {
) -> Option<BTreeMap<GlobalId, Row>> {
let (keep_n, occurred_at_col, id_col) = match collection {
IntrospectionType::SourceStatusHistory => (
self.config.keep_n_source_status_history_entries,
Expand Down Expand Up @@ -2689,7 +2712,7 @@ where
BTreeMap::new();

// BTreeMap to keep track of the row with the latest timestamp for each id
let mut latest_row_per_id: BTreeMap<Datum, CheckedTimestamp<DateTime<Utc>>> =
let mut latest_row_per_id: BTreeMap<Datum, (CheckedTimestamp<DateTime<Utc>>, Vec<Datum>)> =
BTreeMap::new();

// Consolidate the snapshot, so we can process it correctly below.
Expand All @@ -2716,9 +2739,9 @@ where
// Keep track of the timestamp of the latest row per id
let timestamp = occurred_at.unwrap_timestamptz();
match latest_row_per_id.get(&id) {
Some(existing) if existing > &timestamp => {}
Some(existing) if &existing.0 > &timestamp => {}
_ => {
latest_row_per_id.insert(id, timestamp);
latest_row_per_id.insert(id, (timestamp, status_row.clone()));
}
}

Expand Down Expand Up @@ -2764,11 +2787,17 @@ where
Some(
latest_row_per_id
.into_iter()
.filter_map(|(key, timestamp)| {
.filter_map(|(key, (_, row_vec))| {
match GlobalId::from_str(key.unwrap_str()) {
Ok(id) => Some((id, timestamp.into())),
Ok(id) => {
let mut packer = row_buf.packer();
packer.extend(row_vec.into_iter());
Some((id, row_buf.clone()))
}
// Ignore any rows that can't be unwrapped correctly
Err(_) => None,
// TODO(rjobanp): add a dropped status so we can ignore rows that are
// no longer relevant (e.g. dropped connections/sources/sinks)
}
})
.collect(),
Expand Down

0 comments on commit daa77c2

Please sign in to comment.