2
2
/// We are pretending to the server in this scenario,
3
3
/// and this module implements that.
4
4
use bytes:: { Buf , BufMut , BytesMut } ;
5
- use log:: error;
5
+ use log:: { debug , error} ;
6
6
use tokio:: io:: { AsyncReadExt , BufReader } ;
7
7
use tokio:: net:: {
8
8
tcp:: { OwnedReadHalf , OwnedWriteHalf } ,
@@ -70,6 +70,8 @@ impl Client {
70
70
let transaction_mode = config. general . pool_mode . starts_with ( "t" ) ;
71
71
drop ( config) ;
72
72
loop {
73
+ debug ! ( "Waiting for StartupMessage" ) ;
74
+
73
75
// Could be StartupMessage or SSLRequest
74
76
// which makes this variable length.
75
77
let len = match stream. read_i32 ( ) . await {
@@ -91,6 +93,8 @@ impl Client {
91
93
match code {
92
94
// Client wants SSL. We don't support it at the moment.
93
95
SSL_REQUEST_CODE => {
96
+ debug ! ( "Rejecting SSLRequest" ) ;
97
+
94
98
let mut no = BytesMut :: with_capacity ( 1 ) ;
95
99
no. put_u8 ( b'N' ) ;
96
100
@@ -99,6 +103,8 @@ impl Client {
99
103
100
104
// Regular startup message.
101
105
PROTOCOL_VERSION_NUMBER => {
106
+ debug ! ( "Got StartupMessage" ) ;
107
+
102
108
// TODO: perform actual auth.
103
109
let parameters = parse_startup ( bytes. clone ( ) ) ?;
104
110
@@ -110,6 +116,7 @@ impl Client {
110
116
write_all ( & mut stream, server_info) . await ?;
111
117
backend_key_data ( & mut stream, process_id, secret_key) . await ?;
112
118
ready_for_query ( & mut stream) . await ?;
119
+ debug ! ( "Startup OK" ) ;
113
120
114
121
// Split the read and write streams
115
122
// so we can control buffering.
@@ -161,6 +168,8 @@ impl Client {
161
168
pub async fn handle ( & mut self , mut pool : ConnectionPool ) -> Result < ( ) , Error > {
162
169
// The client wants to cancel a query it has issued previously.
163
170
if self . cancel_mode {
171
+ debug ! ( "Sending CancelRequest" ) ;
172
+
164
173
let ( process_id, secret_key, address, port) = {
165
174
let guard = self . client_server_map . lock ( ) . unwrap ( ) ;
166
175
@@ -193,6 +202,8 @@ impl Client {
193
202
// We expect the client to either start a transaction with regular queries
194
203
// or issue commands for our sharding and server selection protocols.
195
204
loop {
205
+ debug ! ( "Client idle, waiting for message" ) ;
206
+
196
207
// Client idle, waiting for messages.
197
208
self . stats . client_idle ( self . process_id ) ;
198
209
@@ -203,6 +214,12 @@ impl Client {
203
214
// SET SHARDING KEY TO 'bigint';
204
215
let mut message = read_message ( & mut self . read ) . await ?;
205
216
217
+ // Avoid taking a server if the client just wants to disconnect.
218
+ if message[ 0 ] as char == 'X' {
219
+ debug ! ( "Client disconnecting" ) ;
220
+ return Ok ( ( ) ) ;
221
+ }
222
+
206
223
// Handle all custom protocol commands here.
207
224
match query_router. try_execute_command ( message. clone ( ) ) {
208
225
// Normal query
@@ -250,9 +267,14 @@ impl Client {
250
267
// Waiting for server connection.
251
268
self . stats . client_waiting ( self . process_id ) ;
252
269
270
+ debug ! ( "Waiting for connection from pool" ) ;
271
+
253
272
// Grab a server from the pool: the client issued a regular query.
254
273
let connection = match pool. get ( query_router. shard ( ) , query_router. role ( ) ) . await {
255
- Ok ( conn) => conn,
274
+ Ok ( conn) => {
275
+ debug ! ( "Got connection from pool" ) ;
276
+ conn
277
+ }
256
278
Err ( err) => {
257
279
error ! ( "Could not get connection from pool: {:?}" , err) ;
258
280
error_response ( & mut self . write , "could not get connection from the pool" )
@@ -272,11 +294,19 @@ impl Client {
272
294
self . stats . client_active ( self . process_id ) ;
273
295
self . stats . server_active ( server. process_id ( ) ) ;
274
296
297
+ debug ! (
298
+ "Client {:?} talking to server {:?}" ,
299
+ self . write. peer_addr( ) . unwrap( ) ,
300
+ server. address( )
301
+ ) ;
302
+
275
303
// Transaction loop. Multiple queries can be issued by the client here.
276
304
// The connection belongs to the client until the transaction is over,
277
305
// or until the client disconnects if we are in session mode.
278
306
loop {
279
307
let mut message = if message. len ( ) == 0 {
308
+ debug ! ( "Waiting for message inside transaction or in session mode" ) ;
309
+
280
310
match read_message ( & mut self . read ) . await {
281
311
Ok ( message) => message,
282
312
Err ( err) => {
@@ -303,9 +333,13 @@ impl Client {
303
333
let code = message. get_u8 ( ) as char ;
304
334
let _len = message. get_i32 ( ) as usize ;
305
335
336
+ debug ! ( "Message: {}" , code) ;
337
+
306
338
match code {
307
339
// ReadyForQuery
308
340
'Q' => {
341
+ debug ! ( "Sending query to server" ) ;
342
+
309
343
// TODO: implement retries here for read-only transactions.
310
344
server. send ( original) . await ?;
311
345
@@ -387,6 +421,8 @@ impl Client {
387
421
// Sync
388
422
// Frontend (client) is asking for the query result now.
389
423
'S' => {
424
+ debug ! ( "Sending query to server" ) ;
425
+
390
426
self . buffer . put ( & original[ ..] ) ;
391
427
392
428
// TODO: retries for read-only transactions.
@@ -471,6 +507,7 @@ impl Client {
471
507
}
472
508
473
509
// The server is no longer bound to us, we can't cancel it's queries anymore.
510
+ debug ! ( "Releasing server back into the pool" ) ;
474
511
self . release ( ) ;
475
512
}
476
513
}
0 commit comments