-
Notifications
You must be signed in to change notification settings - Fork 468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
coord: On Startup wait for table writes in caller #21783
coord: On Startup wait for table writes in caller #21783
Conversation
eb27a7d
to
4ae142a
Compare
@@ -775,7 +779,7 @@ impl Coordinator { | |||
self.cancel_pending_peeks(conn.conn_id()); | |||
self.end_session_for_statement_logging(conn.uuid()); | |||
let update = self.catalog().state().pack_session_update(&conn, -1); | |||
self.send_builtin_table_updates(vec![update]).await; | |||
self.buffer_builtin_table_updates(vec![update]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just thinking out loud here. It's possible that the following happens in order:
- A client terminates there session.
- They receive a response that their session has been terminated.
- Another client queries
mz_sessions
and sees that the session hasn't been terminated.
If we were being extremely strict, one could argue that this is a linearizability violation. I think in practice, it's probably fine but I just wanted to call it out to see what people thought.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reverted this change and then re-added it because I noticed when tearing down many connections we could effectively hang the coordinator because it had to processes hundreds/thousands of table writes. I'm more than happy to go back to blocking on this change, but the effect it has on the coordinator did not seem great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine, can we note it down in a comment here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment!
src/adapter/src/coord.rs
Outdated
/// Operations waiting on this group commit to finish. | ||
Vec<oneshot::Sender<()>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to combine this with the Vec<CompletedClientTransmitter>
or at least clarify what the differences are? It's a bit confusing that there's two thing used to notify clients that are waiting on a response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to combine them but I don't think it's possible, I added a comment explaining the differences
@@ -1913,7 +1926,7 @@ impl Coordinator { | |||
_ = self.advance_timelines_interval.tick() => { | |||
let span = info_span!(parent: None, "advance_timelines_interval"); | |||
span.follows_from(Span::current()); | |||
Message::GroupCommitInitiate(span) | |||
Message::GroupCommitInitiate(span, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually think we should somehow fold this into the group_commit_rx
. Maybe add a select!
internal to the waiter the selects between the notifier and a one second tick.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talked with Joe and Aljoscha offline, our current thought is to move the GroupCommit logic into it's own component/select loop that handles everything, advancing the timelines, low priority writes, user writes, system writes, etc. Once I have an issue up describing that I'll circle back here with a comment
src/adapter/src/coord/appends.rs
Outdated
pub(crate) fn submit_builtin_table_updates<'a>( | ||
&'a mut self, | ||
updates: Vec<BuiltinTableUpdate>, | ||
) -> BoxFuture<'static, ()> { | ||
let (tx, rx) = oneshot::channel(); | ||
self.pending_writes.push(PendingWriteTxn::System { | ||
updates, | ||
source: BuiltinTableUpdateSource::Background(tx), | ||
}); | ||
self.trigger_group_commit(); | ||
Box::pin(rx.map(|_| ())) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit of bike-shedding but it's not clear what the difference between submit_builtin_table_updates
and send_builtin_table_updates
are by their names. Perhaps we can come up with better names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed submit_builtin_table_updates
to send_builtin_table_updates_notify
, I also thought about send_builtin_table_updates_non_blocking
, any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something like execute_builtin_table_updates_immediately
(maybe omit immediately) and submit_builtin_table_updates
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the naming to use _blocking
and _defer
prefixes!
src/adapter/src/coord/appends.rs
Outdated
// We want to allow system writes that have external requests waiting on them, through as | ||
// quickly as possible. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely convinced about this, why would this not be true for user writes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to call out that this is a hack to get tests passing
@@ -775,7 +779,7 @@ impl Coordinator { | |||
self.cancel_pending_peeks(conn.conn_id()); | |||
self.end_session_for_statement_logging(conn.uuid()); | |||
let update = self.catalog().state().pack_session_update(&conn, -1); | |||
self.send_builtin_table_updates(vec![update]).await; | |||
self.buffer_builtin_table_updates(vec![update]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine, can we note it down in a comment here?
updates, | ||
source: BuiltinTableUpdateSource::Async(tx), | ||
}); | ||
self.trigger_group_commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to add the following below this line (like in the method above):
// Avoid excessive group commits by resetting the periodic table advancement timer. The
// group commit triggered by above will already advance all tables.
self.advance_timelines_interval.reset();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to trigger_group_commit()
resets the advance_timelines_interval
src/adapter/src/coord/appends.rs
Outdated
pub(crate) fn submit_builtin_table_updates<'a>( | ||
&'a mut self, | ||
updates: Vec<BuiltinTableUpdate>, | ||
) -> BoxFuture<'static, ()> { | ||
let (tx, rx) = oneshot::channel(); | ||
self.pending_writes.push(PendingWriteTxn::System { | ||
updates, | ||
source: BuiltinTableUpdateSource::Background(tx), | ||
}); | ||
self.trigger_group_commit(); | ||
Box::pin(rx.map(|_| ())) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something like execute_builtin_table_updates_immediately
(maybe omit immediately) and submit_builtin_table_updates
?
af88bb3
to
92473df
Compare
6be36c0
to
90e8539
Compare
@@ -2381,3 +2383,43 @@ def validate_since(since: int, name: str) -> None: | |||
table_since2, index_since2 = collect_sinces() | |||
validate_since(table_since2, "table_since2") | |||
validate_since(index_since2, "index_since2") | |||
|
|||
|
|||
def workflow_test_concurrent_connections(c: Composition) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ParkMyCar I put my explicit test in here directly here, hope that's fine for you. CC @philip-stoev
This is based on Nikhil's suggestion in https://github.com/MaterializeInc/materialize/issues/21782#issuecomment-1722067303, but we don't hit the 0.15 and 0.30 s milestones yet, so the test could still be changed later if things get better. On main this fails brutally (min: 164.59s, p50: 171.39s, p99: 178.49s, max: 178.55s
), in this branch it succeeds locally for me with min: 0.27s, p50: 0.50s, p99: 0.77s, max: 0.87s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put it in Nightly when ready.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be auto-enabled in regular tests pipeline when this PR lands. It only takes a few seconds, so no need for nightly I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh sorry, you are right line 89 enables it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for adding this @def-! I'm quite surprised, but very happy, to see this test passes!
90e8539
to
d5d2d85
Compare
d5d2d85
to
500517f
Compare
This PR updates the `Client` we use to make Frontegg Auth requests to dedupe inflight requests. Specifically, when a request is made we check if a request to that endpoint with those arguments is already inflight, if so, we do not issue a second request and instead we attach a listener/waiter to the already inflight request. ### Motivation Helps improve: https://github.com/MaterializeInc/materialize/issues/21782 [Frontegg documents](https://docs.frontegg.com/docs/frontegg-rate-limit-policies#limits-for-frontegg-workspaces) the API we use for getting auth tokens as accepting 100 requests per-second. As we attempt to scale to supporting thousands of concurrent connection requests (per-user), we hit Frontegg's request limit. With this change + #21783 we have the following latencies when opening concurrent connections: num requests | p50 | p90 | p99 -------------|----------|---------|------- 32 | 34ms | 371ms | 495ms 64 | 25ms | 285ms | 367ms 128 | 31ms | 189ms | 331ms 256 | 66ms | 565ms | 660ms 512 | 4044ms | 4828ms | 4977ms 1024 | 9114ms | 9880ms | 10038ms 2048 | 20031ms | 20784ms | 20931ms 4096 | 21550ms | 22269ms | 22424ms 8192 | 23174ms | 24440ms | 24571ms Something is still happening when we reach 512 connections, but this is about a 10x improvement over the current state of the world. ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [x] This PR includes the following [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note): - N/a
This PR improves environmentd's ability to handle many concurrent connection requests at once. During startup we write to the
mz_sessions
builtin table, this PR moves waiting for that write from the Coordinator into the caller, e.g. the pgwire connection task.It also adds a new mechanism which attempts to be more performant in grouping commits by handing out a permit for a commit, and while the permit is outstanding, low priority group commits are not allowed to run. It also no longer blocks Termination on updating the
mz_sessions
builtin table. The reason being, when terminating many connections we could hang the coordinator for >30 seconds while we processed all of the builtin table writes.Motivation
Helps improve: MaterializeInc/database-issues#6537
With this change + we have the following latencies when opening concurrent connections:
Something is still happening when we reach 512 connections, but this is about a 10x improvement over the current state of the world.
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.