Skip to content

Commit 5948fef

Browse files
authored
Minor Refactoring of re-used code and server stat reporting (#129)
* Minor changes to stats reporting and recduce re-used code * fmt
1 parent 790898c commit 5948fef

File tree

2 files changed

+49
-67
lines changed

2 files changed

+49
-67
lines changed

src/client.rs

Lines changed: 47 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,6 @@ where
667667
.client_disconnecting(self.process_id, last_address_id);
668668
}
669669
self.stats.client_active(self.process_id, address.id);
670-
self.stats.server_active(server.process_id(), address.id);
671670

672671
self.last_address_id = Some(address.id);
673672
self.last_server_id = Some(server.process_id());
@@ -731,52 +730,23 @@ where
731730
'Q' => {
732731
debug!("Sending query to server");
733732

734-
self.send_server_message(
735-
server,
733+
self.send_and_receive_loop(
734+
code,
736735
original,
736+
server,
737737
&address,
738738
query_router.shard(),
739739
&pool,
740740
)
741741
.await?;
742742

743-
// Read all data the server has to offer, which can be multiple messages
744-
// buffered in 8196 bytes chunks.
745-
loop {
746-
let response = self
747-
.receive_server_message(
748-
server,
749-
&address,
750-
query_router.shard(),
751-
&pool,
752-
)
753-
.await?;
754-
755-
// Send server reply to the client.
756-
match write_all_half(&mut self.write, response).await {
757-
Ok(_) => (),
758-
Err(err) => {
759-
server.mark_bad();
760-
return Err(err);
761-
}
762-
};
763-
764-
if !server.is_data_available() {
765-
break;
766-
}
767-
}
768-
769-
// Report query executed statistics.
770-
self.stats.query(self.process_id, address.id);
771-
772743
if !server.in_transaction() {
773744
// Report transaction executed statistics.
774745
self.stats.transaction(self.process_id, address.id);
775746

776747
// Release server back to the pool if we are in transaction mode.
777748
// If we are in session mode, we keep the server until the client disconnects.
778749
if self.transaction_mode {
779-
self.stats.server_idle(server.process_id(), address.id);
780750
break;
781751
}
782752
}
@@ -830,9 +800,10 @@ where
830800

831801
self.buffer.put(&original[..]);
832802

833-
self.send_server_message(
834-
server,
803+
self.send_and_receive_loop(
804+
code,
835805
self.buffer.clone(),
806+
server,
836807
&address,
837808
query_router.shard(),
838809
&pool,
@@ -841,41 +812,12 @@ where
841812

842813
self.buffer.clear();
843814

844-
// Read all data the server has to offer, which can be multiple messages
845-
// buffered in 8196 bytes chunks.
846-
loop {
847-
let response = self
848-
.receive_server_message(
849-
server,
850-
&address,
851-
query_router.shard(),
852-
&pool,
853-
)
854-
.await?;
855-
856-
match write_all_half(&mut self.write, response).await {
857-
Ok(_) => (),
858-
Err(err) => {
859-
server.mark_bad();
860-
return Err(err);
861-
}
862-
};
863-
864-
if !server.is_data_available() {
865-
break;
866-
}
867-
}
868-
869-
// Report query executed statistics.
870-
self.stats.query(self.process_id, address.id);
871-
872815
if !server.in_transaction() {
873816
self.stats.transaction(self.process_id, address.id);
874817

875818
// Release server back to the pool if we are in transaction mode.
876819
// If we are in session mode, we keep the server until the client disconnects.
877820
if self.transaction_mode {
878-
self.stats.server_idle(server.process_id(), address.id);
879821
break;
880822
}
881823
}
@@ -925,7 +867,6 @@ where
925867
// Release server back to the pool if we are in transaction mode.
926868
// If we are in session mode, we keep the server until the client disconnects.
927869
if self.transaction_mode {
928-
self.stats.server_idle(server.process_id(), address.id);
929870
break;
930871
}
931872
}
@@ -941,6 +882,7 @@ where
941882

942883
// The server is no longer bound to us, we can't cancel it's queries anymore.
943884
debug!("Releasing server back into the pool");
885+
self.stats.server_idle(server.process_id(), address.id);
944886
self.release();
945887
self.stats.client_idle(self.process_id, address.id);
946888
}
@@ -952,6 +894,46 @@ where
952894
guard.remove(&(self.process_id, self.secret_key));
953895
}
954896

897+
async fn send_and_receive_loop(
898+
&mut self,
899+
code: char,
900+
message: BytesMut,
901+
server: &mut Server,
902+
address: &Address,
903+
shard: usize,
904+
pool: &ConnectionPool,
905+
) -> Result<(), Error> {
906+
debug!("Sending {} to server", code);
907+
908+
self.send_server_message(server, message, &address, shard, &pool)
909+
.await?;
910+
911+
// Read all data the server has to offer, which can be multiple messages
912+
// buffered in 8196 bytes chunks.
913+
loop {
914+
let response = self
915+
.receive_server_message(server, &address, shard, &pool)
916+
.await?;
917+
918+
match write_all_half(&mut self.write, response).await {
919+
Ok(_) => (),
920+
Err(err) => {
921+
server.mark_bad();
922+
return Err(err);
923+
}
924+
};
925+
926+
if !server.is_data_available() {
927+
break;
928+
}
929+
}
930+
931+
// Report query executed statistics.
932+
self.stats.query(self.process_id, address.id);
933+
934+
Ok(())
935+
}
936+
955937
async fn send_server_message(
956938
&self,
957939
server: &mut Server,

src/pool.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ impl ConnectionPool {
335335
if !require_healthcheck {
336336
self.stats
337337
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
338-
self.stats.server_idle(conn.process_id(), address.id);
338+
self.stats.server_active(conn.process_id(), address.id);
339339
return Ok((conn, address.clone()));
340340
}
341341

@@ -354,7 +354,7 @@ impl ConnectionPool {
354354
Ok(_) => {
355355
self.stats
356356
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
357-
self.stats.server_idle(conn.process_id(), address.id);
357+
self.stats.server_active(conn.process_id(), address.id);
358358
return Ok((conn, address.clone()));
359359
}
360360

0 commit comments

Comments
 (0)