-
Notifications
You must be signed in to change notification settings - Fork 468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
coord: Prioritize read latency over write latency #13967
Changes from 1 commit
5024870
e6d5473
4188858
535bda1
a87ab70
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -82,7 +82,7 @@ use rand::seq::SliceRandom; | |
use timely::progress::{Antichain, Timestamp as TimelyTimestamp}; | ||
use tokio::runtime::Handle as TokioHandle; | ||
use tokio::select; | ||
use tokio::sync::{mpsc, oneshot, watch}; | ||
use tokio::sync::{mpsc, oneshot, watch, OwnedMutexGuard}; | ||
use tracing::{span, Level}; | ||
use uuid::Uuid; | ||
|
||
|
@@ -113,7 +113,7 @@ use crate::catalog::{ | |
}; | ||
use crate::client::{Client, ConnectionId, Handle}; | ||
use crate::command::{Canceled, Command, ExecuteResponse}; | ||
use crate::coord::appends::{AdvanceLocalInput, Deferred, PendingWriteTxn}; | ||
use crate::coord::appends::{BuiltinTableUpdateSource, Deferred, PendingWriteTxn}; | ||
use crate::coord::id_bundle::CollectionIdBundle; | ||
use crate::coord::peek::PendingPeek; | ||
use crate::coord::read_policy::{ReadCapability, ReadHolds}; | ||
|
@@ -122,7 +122,7 @@ use crate::error::AdapterError; | |
use crate::session::{EndTransactionAction, Session}; | ||
use crate::sink_connection; | ||
use crate::tail::PendingTail; | ||
use crate::util::ClientTransmitter; | ||
use crate::util::{ClientTransmitter, CompletedClientTransmitter}; | ||
|
||
pub(crate) mod id_bundle; | ||
pub(crate) mod peek; | ||
|
@@ -154,11 +154,16 @@ pub enum Message<T = mz_repr::Timestamp> { | |
SinkConnectionReady(SinkConnectionReady), | ||
SendDiffs(SendDiffs), | ||
WriteLockGrant(tokio::sync::OwnedMutexGuard<()>), | ||
AdvanceTimelines, | ||
AdvanceLocalInput(AdvanceLocalInput<T>), | ||
GroupCommit, | ||
GroupCommitInitiate, | ||
GroupCommitApply( | ||
T, | ||
Vec<CompletedClientTransmitter<ExecuteResponse>>, | ||
Option<OwnedMutexGuard<()>>, | ||
), | ||
ComputeInstanceStatus(ComputeInstanceEvent), | ||
RemovePendingPeeks { conn_id: ConnectionId }, | ||
RemovePendingPeeks { | ||
conn_id: ConnectionId, | ||
}, | ||
LinearizeReads(Vec<PendingTxn>), | ||
} | ||
|
||
|
@@ -652,7 +657,7 @@ impl<S: Append + 'static> Coordinator<S> { | |
let WriteTimestamp { | ||
timestamp: _, | ||
advance_to, | ||
} = self.get_and_step_local_write_ts().await; | ||
} = self.get_local_write_ts().await; | ||
let appends = entries | ||
.iter() | ||
.filter(|entry| entry.is_table()) | ||
|
@@ -688,7 +693,8 @@ impl<S: Append + 'static> Coordinator<S> { | |
builtin_table_updates.extend(retractions); | ||
} | ||
|
||
self.send_builtin_table_updates(builtin_table_updates).await; | ||
self.send_builtin_table_updates(builtin_table_updates, BuiltinTableUpdateSource::DDL) | ||
.await; | ||
|
||
Ok(()) | ||
} | ||
|
@@ -754,7 +760,7 @@ impl<S: Append + 'static> Coordinator<S> { | |
} | ||
// `tick()` on `Interval` is cancel-safe: | ||
// https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety | ||
_ = advance_timelines_interval.tick() => Message::AdvanceTimelines, | ||
_ = advance_timelines_interval.tick() => Message::GroupCommitInitiate, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we wanted to keep things moving, we could tick this each time the group commit completes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wouldn't be against that. My one fear is that we'd end up clogging the Coordinator up somehow, but since append is now done asynchronously it should be fine? |
||
}; | ||
|
||
// All message processing functions trace. Start a parent span for them to make | ||
|
@@ -763,10 +769,6 @@ impl<S: Append + 'static> Coordinator<S> { | |
let _enter = span.enter(); | ||
|
||
self.handle_message(msg).await; | ||
|
||
if let Some(timestamp) = self.get_local_timestamp_oracle_mut().should_advance_to() { | ||
self.queue_local_input_advances(timestamp).await; | ||
} | ||
Comment on lines
-778
to
-780
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By removing this, do we risk making the table advancement a low priority event? Seems removing this takes it from "once per message received" to "lowest priority event". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Queuing a table advancement was happening on a "once per message received" rate. However, actually executing the table advancements was already the lowest priority message that the Coordinator handled. In practice the only things that triggered this were group commit, DDL, and the |
||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, but it would be helpful to have comments on these variants. I'm sort of guessing what
Apply
means, for example, maybe "it finished"? But, this enum in particular calls out for doccomments explaining what each message means.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, btw I was having a hard time coming up with a good name for this variant and settled on "Apply" if you have a better idea then let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is that "Apply" makes a group commit visible by doing the following: