1- use  std:: collections:: HashMap ; 
21use  std:: collections:: hash_map:: Entry ; 
2+ use  std:: collections:: HashMap ; 
33use  std:: path:: PathBuf ; 
44use  std:: sync:: Arc ; 
55use  std:: time:: { Duration ,  Instant } ; 
66
77use  bytes:: Bytes ; 
8+ use  either:: Either ; 
89use  libsqlx:: libsql:: { LibsqlDatabase ,  LogCompactor ,  LogFile ,  PrimaryType ,  ReplicaType } ; 
9- use  libsqlx:: proxy:: WriteProxyDatabase ; 
10+ use  libsqlx:: proxy:: { WriteProxyConnection ,  WriteProxyDatabase } ; 
11+ use  libsqlx:: result_builder:: ResultBuilder ; 
1012use  libsqlx:: { 
1113    Database  as  _,  DescribeResponse ,  Frame ,  FrameNo ,  InjectableDatabase ,  Injector ,  LogReadError , 
1214    ReplicationLogger , 
@@ -27,7 +29,11 @@ use self::config::{AllocConfig, DbConfig};
2729
2830pub  mod  config; 
2931
30- type  ExecFn  = Box < dyn  FnOnce ( & mut  dyn  libsqlx:: Connection )  + Send > ; 
32+ type  LibsqlConnection  = Either < 
33+     libsqlx:: libsql:: LibsqlConnection < PrimaryType > , 
34+     WriteProxyConnection < libsqlx:: libsql:: LibsqlConnection < ReplicaType > ,  DummyConn > , 
35+ > ; 
36+ type  ExecFn  = Box < dyn  FnOnce ( & mut  LibsqlConnection )  + Send > ; 
3137
3238#[ derive( Clone ) ]  
3339pub  struct  ConnectionId  { 
@@ -47,10 +53,10 @@ pub struct DummyDb;
4753pub  struct  DummyConn ; 
4854
4955impl  libsqlx:: Connection  for  DummyConn  { 
50-     fn  execute_program ( 
56+     fn  execute_program < B :   ResultBuilder > ( 
5157        & mut  self , 
52-         _pgm :  libsqlx:: program:: Program , 
53-         _result_builder :  & mut   dyn  libsqlx :: result_builder :: ResultBuilder , 
58+         _pgm :  & libsqlx:: program:: Program , 
59+         _result_builder :  B , 
5460    )  -> libsqlx:: Result < ( ) >  { 
5561        todo ! ( ) 
5662    } 
@@ -207,7 +213,12 @@ impl FrameStreamer {
207213                    if  !self . buffer . is_empty ( )  { 
208214                        self . send_frames ( ) . await ; 
209215                    } 
210-                     if  self . notifier . wait_for ( |fno| dbg ! ( * fno)  >= self . next_frame_no ) . await . is_err ( )  { 
216+                     if  self 
217+                         . notifier 
218+                         . wait_for ( |fno| * fno >= self . next_frame_no ) 
219+                         . await 
220+                         . is_err ( ) 
221+                     { 
211222                        break ; 
212223                    } 
213224                } 
@@ -244,7 +255,9 @@ impl Database {
244255                    path, 
245256                    Compactor , 
246257                    false , 
247-                     Box :: new ( move  |fno| {  let  _ = sender. send ( fno) ;  }  ) , 
258+                     Box :: new ( move  |fno| { 
259+                         let  _ = sender. send ( fno) ; 
260+                     } ) , 
248261                ) 
249262                . unwrap ( ) ; 
250263
@@ -253,7 +266,7 @@ impl Database {
253266                    replica_streams :  HashMap :: new ( ) , 
254267                    frame_notifier :  receiver, 
255268                } 
256-             } , 
269+             } 
257270            DbConfig :: Replica  {  primary_node_id }  => { 
258271                let  rdb = LibsqlDatabase :: new_replica ( path,  MAX_INJECTOR_BUFFER_CAP ,  ( ) ) . unwrap ( ) ; 
259272                let  wdb = DummyDb ; 
@@ -285,10 +298,10 @@ impl Database {
285298        } 
286299    } 
287300
288-     fn  connect ( & self )  -> Box < dyn  libsqlx :: Connection  +  Send >  { 
301+     fn  connect ( & self )  -> LibsqlConnection  { 
289302        match  self  { 
290-             Database :: Primary  {  db,  .. }  => Box :: new ( db. connect ( ) . unwrap ( ) ) , 
291-             Database :: Replica  {  db,  .. }  => Box :: new ( db. connect ( ) . unwrap ( ) ) , 
303+             Database :: Primary  {  db,  .. }  => Either :: Left ( db. connect ( ) . unwrap ( ) ) , 
304+             Database :: Replica  {  db,  .. }  => Either :: Right ( db. connect ( ) . unwrap ( ) ) , 
292305        } 
293306    } 
294307} 
@@ -315,11 +328,11 @@ pub struct ConnectionHandle {
315328impl  ConnectionHandle  { 
316329    pub  async  fn  exec < F ,  R > ( & self ,  f :  F )  -> crate :: Result < R > 
317330    where 
318-         F :  for < ' a >  FnOnce ( & ' a  mut  ( dyn  libsqlx :: Connection  +  ' a ) )  -> R  + Send  + ' static , 
331+         F :  for < ' a >  FnOnce ( & ' a  mut  LibsqlConnection )  -> R  + Send  + ' static , 
319332        R :  Send  + ' static , 
320333    { 
321334        let  ( sender,  ret)  = oneshot:: channel ( ) ; 
322-         let  cb = move  |conn :  & mut  dyn  libsqlx :: Connection | { 
335+         let  cb = move  |conn :  & mut  LibsqlConnection | { 
323336            let  res = f ( conn) ; 
324337            let  _ = sender. send ( res) ; 
325338        } ; 
@@ -371,9 +384,15 @@ impl Allocation {
371384            Message :: Handshake  {  .. }  => unreachable ! ( "handshake should have been caught earlier" ) , 
372385            Message :: ReplicationHandshake  {  .. }  => todo ! ( ) , 
373386            Message :: ReplicationHandshakeResponse  {  .. }  => todo ! ( ) , 
374-             Message :: Replicate  {  req_no,  next_frame_no }  => match  & mut  self . database  { 
375-                 Database :: Primary  {  db,  replica_streams,  frame_notifier }  => { 
376-                     dbg ! ( next_frame_no) ; 
387+             Message :: Replicate  { 
388+                 req_no, 
389+                 next_frame_no, 
390+             }  => match  & mut  self . database  { 
391+                 Database :: Primary  { 
392+                     db, 
393+                     replica_streams, 
394+                     frame_notifier, 
395+                 }  => { 
377396                    let  streamer = FrameStreamer  { 
378397                        logger :  db. logger ( ) , 
379398                        database_id :  DatabaseId :: from_name ( & self . db_name ) , 
@@ -396,15 +415,15 @@ impl Allocation {
396415                                * old_req_no = req_no; 
397416                                old_handle. abort ( ) ; 
398417                            } 
399-                         } , 
418+                         } 
400419                        Entry :: Vacant ( e)  => { 
401420                            let  handle = tokio:: spawn ( streamer. run ( ) ) ; 
402421                            // For some reason, not yielding causes the task not to be spawned 
403422                            tokio:: task:: yield_now ( ) . await ; 
404423                            e. insert ( ( req_no,  handle) ) ; 
405-                         } , 
424+                         } 
406425                    } 
407-                 } , 
426+                 } 
408427                Database :: Replica  {  .. }  => todo ! ( "not a primary!" ) , 
409428            } , 
410429            Message :: Frames ( frames)  => match  & mut  self . database  { 
@@ -459,7 +478,7 @@ impl Allocation {
459478
460479struct  Connection  { 
461480    id :  u32 , 
462-     conn :  Box < dyn  libsqlx :: Connection  +  Send > , 
481+     conn :  LibsqlConnection , 
463482    exit :  oneshot:: Receiver < ( ) > , 
464483    exec :  mpsc:: Receiver < ExecFn > , 
465484} 
@@ -470,7 +489,7 @@ impl Connection {
470489            tokio:: select! { 
471490                _ = & mut  self . exit => break , 
472491                Some ( exec)  = self . exec. recv( )  => { 
473-                     tokio:: task:: block_in_place( || exec( & mut  * self . conn) ) ; 
492+                     tokio:: task:: block_in_place( || exec( & mut  self . conn) ) ; 
474493                } 
475494            } 
476495        } 
0 commit comments