Skip to content

Minor Refactoring of re-used code and server stat reporting #129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 47 additions & 65 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,6 @@ where
.client_disconnecting(self.process_id, last_address_id);
}
self.stats.client_active(self.process_id, address.id);
self.stats.server_active(server.process_id(), address.id);

self.last_address_id = Some(address.id);
self.last_server_id = Some(server.process_id());
Expand Down Expand Up @@ -731,52 +730,23 @@ where
'Q' => {
debug!("Sending query to server");

self.send_server_message(
server,
self.send_and_receive_loop(
code,
original,
server,
&address,
query_router.shard(),
&pool,
)
.await?;

// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
let response = self
.receive_server_message(
server,
&address,
query_router.shard(),
&pool,
)
.await?;

// Send server reply to the client.
match write_all_half(&mut self.write, response).await {
Ok(_) => (),
Err(err) => {
server.mark_bad();
return Err(err);
}
};

if !server.is_data_available() {
break;
}
}

// Report query executed statistics.
self.stats.query(self.process_id, address.id);

if !server.in_transaction() {
// Report transaction executed statistics.
self.stats.transaction(self.process_id, address.id);

// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
self.stats.server_idle(server.process_id(), address.id);
break;
}
}
Expand Down Expand Up @@ -830,9 +800,10 @@ where

self.buffer.put(&original[..]);

self.send_server_message(
server,
self.send_and_receive_loop(
code,
self.buffer.clone(),
server,
&address,
query_router.shard(),
&pool,
Expand All @@ -841,41 +812,12 @@ where

self.buffer.clear();

// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
let response = self
.receive_server_message(
server,
&address,
query_router.shard(),
&pool,
)
.await?;

match write_all_half(&mut self.write, response).await {
Ok(_) => (),
Err(err) => {
server.mark_bad();
return Err(err);
}
};

if !server.is_data_available() {
break;
}
}

// Report query executed statistics.
self.stats.query(self.process_id, address.id);

if !server.in_transaction() {
self.stats.transaction(self.process_id, address.id);

// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
self.stats.server_idle(server.process_id(), address.id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idle in transaction would be a cool state to add to our stats some day.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I originally had some confusion about this metric. I thought it was reporting the state of server connections internally. But it seems like the way pgbouncer is doing it, is it's actually determining which connections are checked out, or no checked out

break;
}
}
Expand Down Expand Up @@ -925,7 +867,6 @@ where
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
self.stats.server_idle(server.process_id(), address.id);
break;
}
}
Expand All @@ -941,6 +882,7 @@ where

// The server is no longer bound to us, we can't cancel it's queries anymore.
debug!("Releasing server back into the pool");
self.stats.server_idle(server.process_id(), address.id);
self.release();
self.stats.client_idle(self.process_id, address.id);
}
Expand All @@ -952,6 +894,46 @@ where
guard.remove(&(self.process_id, self.secret_key));
}

async fn send_and_receive_loop(
&mut self,
code: char,
message: BytesMut,
server: &mut Server,
address: &Address,
shard: usize,
pool: &ConnectionPool,
) -> Result<(), Error> {
debug!("Sending {} to server", code);

self.send_server_message(server, message, &address, shard, &pool)
.await?;

// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
let response = self
.receive_server_message(server, &address, shard, &pool)
.await?;

match write_all_half(&mut self.write, response).await {
Ok(_) => (),
Err(err) => {
server.mark_bad();
return Err(err);
}
};

if !server.is_data_available() {
break;
}
}

// Report query executed statistics.
self.stats.query(self.process_id, address.id);

Ok(())
}

async fn send_server_message(
&self,
server: &mut Server,
Expand Down
4 changes: 2 additions & 2 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ impl ConnectionPool {
if !require_healthcheck {
self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
self.stats.server_idle(conn.process_id(), address.id);
self.stats.server_active(conn.process_id(), address.id);
return Ok((conn, address.clone()));
}

Expand All @@ -354,7 +354,7 @@ impl ConnectionPool {
Ok(_) => {
self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
self.stats.server_idle(conn.process_id(), address.id);
self.stats.server_active(conn.process_id(), address.id);
return Ok((conn, address.clone()));
}

Expand Down