-
Notifications
You must be signed in to change notification settings - Fork 468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
coord: Prioritize read latency over write latency #13967
coord: Prioritize read latency over write latency #13967
Conversation
06f86ff
to
1bdbf50
Compare
35520ef
to
2c0c647
Compare
FYI for anyone reading this. At one point this was passing every test but a single flaky test: https://buildkite.com/materialize/tests/builds/38774. Although I've been force pushing you can get to the code from the build if you want to see the difference. So hopefully this isn't too far from getting back to passing every test. EDIT: This is now once again passing every test but a single flaky one. |
234c3f7
to
eed5fe9
Compare
The goal of this commit is to make sure that the global timestamp never advances past the upper of any table. Doing this will make sure that a table read will never have to block and wait for an upper to advance, which should improve the performance of many table reads. This performance improvement comes at the cost of performance for table writes and DDL. This is accomplished by combining table advancements into group commit. All writes will advance every other table to the same timestamp of the write. Periodically empty group commits are triggered which include no writes, but advances every table. The global timestamp is only advanced after these table advancements to the timestamp of the advancement.
eed5fe9
to
5024870
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read it! I'm not certain I understand all of it (various nits about whether we are in DDL, changing behavior based on held locks, etc. The idea seems good, though, but I'd love for us to take a pass at some point to clarify the state machine that the coordinator is implementing.
@@ -754,7 +760,7 @@ impl<S: Append + 'static> Coordinator<S> { | |||
} | |||
// `tick()` on `Interval` is cancel-safe: | |||
// https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety | |||
_ = advance_timelines_interval.tick() => Message::AdvanceTimelines, | |||
_ = advance_timelines_interval.tick() => Message::GroupCommitInitiate, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we wanted to keep things moving, we could tick this each time the group commit completes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't be against that. My one fear is that we'd end up clogging the Coordinator up somehow, but since append is now done asynchronously it should be fine?
if let Some(timestamp) = self.get_local_timestamp_oracle_mut().should_advance_to() { | ||
self.queue_local_input_advances(timestamp).await; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By removing this, do we risk making the table advancement a low priority event? Seems removing this takes it from "once per message received" to "lowest priority event".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Queuing a table advancement was happening on a "once per message received" rate. However, actually executing the table advancements was already the lowest priority message that the Coordinator handled.
In practice the only things that triggered this were group commit, DDL, and the advance_timelines_interval
, all of which now explicitly advance tables.
src/adapter/src/coord.rs
Outdated
GroupCommitInitiate, | ||
GroupCommitApply( | ||
T, | ||
Vec<CompletedClientTransmitter<ExecuteResponse>>, | ||
Option<OwnedMutexGuard<()>>, | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, but it would be helpful to have comments on these variants. I'm sort of guessing what Apply
means, for example, maybe "it finished"? But, this enum in particular calls out for doccomments explaining what each message means.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, btw I was having a hard time coming up with a good name for this variant and settled on "Apply" if you have a better idea then let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is that "Apply" makes a group commit visible by doing the following:
- Advancing the timeline.
- Sending responses to clients.
- Updating read holds (This is purely for convenience and we could do this elsewhere).
src/adapter/src/coord/timeline.rs
Outdated
/// and writes happen at the write timestamp. After the write has completed, | ||
/// but before a response is sent, the read timestamp must be updated to the | ||
/// same value as the write timestamp. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the condition maybe weaker, that "before responding to a write, the read timestamp must be updated to the timestamp of the write"? Or must the read timestamp be updated to self.write_ts
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about something like
After the write has completed, but before a response is sent, the read timestamp must be updated to a value greater than or equal to
self.write_ts
.
That's a bit weaker and more accurate.
if next.less_equal(&self.state.write_ts) { | ||
next = self.state.write_ts.step_forward(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I follow this. I think this was inherited from the Reading(ts)
case above, but I think here it is not as necessary? Or if it is, the comparison should be with self.state.read_ts
instead of write_ts
, as we want to be sure the write occurs after any previously issued read timestamp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this is an abstraction leak but my thought process was the following. You can't every call append
twice with the same timestamp, so to prevent that from happening every write timestamp is larger than the previous write timestamp returned (this should be reflected in the doccomments, which it's not currently).
/// Peek the current write timestamp. | ||
fn peek_write_ts(&self) -> T { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read this and thought that maybe there was a different way to go from the previous design to here. I'll have to see how peek_write_ts
gets used, but if it was just where we previously used write_ts
, maybe there should be write_ts
and seal_write_ts
, or so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
peek_write_ts
is used where previously peek_ts
was used. Group commit uses it to determine how long to sleep for in case the write timestamp is ahead of the system clock.
src/adapter/src/coord/appends.rs
Outdated
pub(crate) enum BuiltinTableUpdateSource { | ||
DDL, | ||
Background, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doccomments on here to communicate the meaning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
// write was waiting, then the write will be ignored and we respond to the | ||
// client that the write was successful. This is only possible if the write |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might this be weird for a person observing through a SINK or TAIL?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's a good point. If someone is executing a tail, there's a possibility that we respond with an OK to a client who performs an insert and the person executing the tail won't see it.
This is actually existing behavior but probably not very good. Now that DDL is combined with writes, we should be able to remove this condition. Before actually deleting a table, we'd executing all pending writes which means that this scenario is impossible. I'd probably prefer to do that in a follow up though. I'll open an issue.
src/adapter/src/coord/appends.rs
Outdated
} | ||
} | ||
|
||
/// Applies the results of a completed group commit. The global timestamp will be advanced to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"The global timestamp" -> "read timestamp"? I'm not actually sure what the global timestamp is tbh. This probably means something like "the read timestamp of the timeline(s) of the writes"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's still a lot of language in the comments in the Coord that assumes there's only a single timeline. The global timestamp generally refers to the timestamp of the real time timeline.
How does this sound
The read timestamp of the timeline containing user tables will be advanced to the timestamp of the completed write, the read hold on the timeline containing user tables is advanced to the new time, and responses are sent to all waiting clients.
src/adapter/src/coord/appends.rs
Outdated
self.apply_local_write(timestamp).await; | ||
|
||
// If we're in the middle of DDL then the catalog and COMPUTE/STORAGE may be out of sync, | ||
// so we don't advance other timeliens and we don't update read holds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: "timeliens"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
…ance-table-group-commit � Conflicts: � src/adapter/src/coord.rs
The goal of this commit is to make sure that the global timestamp never
advances past the upper of any table. Doing this will make sure that a
table read will never have to block and wait for an upper to advance,
which should improve the performance of many table reads. This
performance improvement comes at the cost of performance for table
writes and DDL.
This is accomplished by combining table advancements into group commit.
All writes will advance every other table to the same timestamp of the
write. Periodically empty group commits are triggered which include no
writes, but advances every table. The global timestamp is only advanced
after these table advancements to the timestamp of the advancement.
Motivation
This PR improves read performance.
Checklist
This PR has adequate test coverage / QA involvement has been duly considered.
This PR evolves an existing
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way) and therefore is tagged with aT-protobuf
label.This PR includes the following user-facing behavior changes: