-
Notifications
You must be signed in to change notification settings - Fork 503
coord: Prioritize read latency over write latency #13967
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5024870
e6d5473
4188858
535bda1
a87ab70
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -82,7 +82,7 @@ use rand::seq::SliceRandom; | |
| use timely::progress::{Antichain, Timestamp as TimelyTimestamp}; | ||
| use tokio::runtime::Handle as TokioHandle; | ||
| use tokio::select; | ||
| use tokio::sync::{mpsc, oneshot, watch}; | ||
| use tokio::sync::{mpsc, oneshot, watch, OwnedMutexGuard}; | ||
| use tracing::{span, Level}; | ||
| use uuid::Uuid; | ||
|
|
||
|
|
@@ -114,7 +114,7 @@ use crate::catalog::{ | |
| }; | ||
| use crate::client::{Client, ConnectionId, Handle}; | ||
| use crate::command::{Canceled, Command, ExecuteResponse}; | ||
| use crate::coord::appends::{AdvanceLocalInput, Deferred, PendingWriteTxn}; | ||
| use crate::coord::appends::{BuiltinTableUpdateSource, Deferred, PendingWriteTxn}; | ||
| use crate::coord::id_bundle::CollectionIdBundle; | ||
| use crate::coord::peek::PendingPeek; | ||
| use crate::coord::read_policy::{ReadCapability, ReadHolds}; | ||
|
|
@@ -123,7 +123,7 @@ use crate::error::AdapterError; | |
| use crate::session::{EndTransactionAction, Session}; | ||
| use crate::sink_connection; | ||
| use crate::tail::PendingTail; | ||
| use crate::util::ClientTransmitter; | ||
| use crate::util::{ClientTransmitter, CompletedClientTransmitter}; | ||
|
|
||
| pub(crate) mod id_bundle; | ||
| pub(crate) mod peek; | ||
|
|
@@ -155,11 +155,21 @@ pub enum Message<T = mz_repr::Timestamp> { | |
| SinkConnectionReady(SinkConnectionReady), | ||
| SendDiffs(SendDiffs), | ||
| WriteLockGrant(tokio::sync::OwnedMutexGuard<()>), | ||
| AdvanceTimelines, | ||
| AdvanceLocalInput(AdvanceLocalInput<T>), | ||
| GroupCommit, | ||
| /// Initiates a group commit. | ||
| GroupCommitInitiate, | ||
| /// Makes a group commit visible to all clients. | ||
| GroupCommitApply( | ||
| /// Timestamp of the writes in the group commit. | ||
| T, | ||
| /// Clients waiting on responses from the group commit. | ||
| Vec<CompletedClientTransmitter<ExecuteResponse>>, | ||
| /// Optional lock if the group commit contained writes to user tables. | ||
| Option<OwnedMutexGuard<()>>, | ||
| ), | ||
| ComputeInstanceStatus(ComputeInstanceEvent), | ||
| RemovePendingPeeks { conn_id: ConnectionId }, | ||
| RemovePendingPeeks { | ||
| conn_id: ConnectionId, | ||
| }, | ||
| LinearizeReads(Vec<PendingTxn>), | ||
| StorageUsage, | ||
| } | ||
|
|
@@ -658,7 +668,7 @@ impl<S: Append + 'static> Coordinator<S> { | |
| let WriteTimestamp { | ||
| timestamp: _, | ||
| advance_to, | ||
| } = self.get_and_step_local_write_ts().await; | ||
| } = self.get_local_write_ts().await; | ||
| let appends = entries | ||
| .iter() | ||
| .filter(|entry| entry.is_table()) | ||
|
|
@@ -694,7 +704,8 @@ impl<S: Append + 'static> Coordinator<S> { | |
| builtin_table_updates.extend(retractions); | ||
| } | ||
|
|
||
| self.send_builtin_table_updates(builtin_table_updates).await; | ||
| self.send_builtin_table_updates(builtin_table_updates, BuiltinTableUpdateSource::DDL) | ||
| .await; | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
@@ -764,7 +775,7 @@ impl<S: Append + 'static> Coordinator<S> { | |
| } | ||
| // `tick()` on `Interval` is cancel-safe: | ||
| // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety | ||
| _ = advance_timelines_interval.tick() => Message::AdvanceTimelines, | ||
| _ = advance_timelines_interval.tick() => Message::GroupCommitInitiate, | ||
| _ = storage_usage_update_interval.tick() => Message::StorageUsage, | ||
| }; | ||
|
|
||
|
|
@@ -774,10 +785,6 @@ impl<S: Append + 'static> Coordinator<S> { | |
| let _enter = span.enter(); | ||
|
|
||
| self.handle_message(msg).await; | ||
|
|
||
| if let Some(timestamp) = self.get_local_timestamp_oracle_mut().should_advance_to() { | ||
| self.queue_local_input_advances(timestamp).await; | ||
| } | ||
|
Comment on lines
-778
to
-780
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By removing this, do we risk making the table advancement a low priority event? Seems removing this takes it from "once per message received" to "lowest priority event".
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Queuing a table advancement was happening on a "once per message received" rate. However, actually executing the table advancements was already the lowest priority message that the Coordinator handled. In practice the only things that triggered this were group commit, DDL, and the |
||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we wanted to keep things moving, we could tick this each time the group commit completes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't be against that. My one fear is that we'd end up clogging the Coordinator up somehow, but since append is now done asynchronously it should be fine?