11use  std:: collections:: HashMap ; 
2+ use  std:: path:: PathBuf ; 
23
3- use  tokio:: { sync:: { mpsc,  oneshot} ,  task:: { JoinSet ,  block_in_place} } ; 
4+ use  libsqlx:: Database  as  _; 
5+ use  libsqlx:: libsql:: { LibsqlDatabase ,  LogCompactor ,  LogFile ,  PrimaryType } ; 
6+ use  tokio:: task:: { block_in_place,  JoinSet } ; 
7+ use  tokio:: sync:: { mpsc,  oneshot} ; 
8+ 
9+ use  self :: config:: { AllocConfig ,  DbConfig } ; 
410
511pub  mod  config; 
612
713type  ExecFn  = Box < dyn  FnOnce ( & mut  dyn  libsqlx:: Connection )  + Send > ; 
814
915#[ derive( Clone ) ]  
10- struct  ConnectionId  { 
16+ pub   struct  ConnectionId  { 
1117    id :  u32 , 
1218    close_sender :  mpsc:: Sender < ( ) > , 
1319} 
1420
15- enum  AllocationMessage  { 
21+ pub   enum  AllocationMessage  { 
1622    /// Execute callback against connection 
1723Exec  { 
1824        connection_id :  ConnectionId , 
@@ -22,30 +28,61 @@ enum AllocationMessage {
2228NewConnExec  { 
2329        exec :  ExecFn , 
2430        ret :  oneshot:: Sender < ConnectionId > , 
25-     } 
31+     } , 
32+ } 
33+ 
34+ pub  enum  Database  { 
35+     Primary ( libsqlx:: libsql:: LibsqlDatabase < PrimaryType > ) , 
2636} 
2737
28- enum  Database  { } 
38+ struct  Compactor ; 
39+ 
40+ impl  LogCompactor  for  Compactor  { 
41+     fn  should_compact ( & self ,  _log :  & LogFile )  -> bool  { 
42+         false 
43+     } 
44+ 
45+     fn  compact ( 
46+         & self , 
47+         _log :  LogFile , 
48+         _path :  std:: path:: PathBuf , 
49+         _size_after :  u32 , 
50+     )  -> Result < ( ) ,  Box < dyn  std:: error:: Error  + Sync  + Send  + ' static > >  { 
51+         todo ! ( ) 
52+     } 
53+ } 
2954
3055impl  Database  { 
56+     pub  fn  from_config ( config :  & AllocConfig ,  path :  PathBuf )  -> Self  { 
57+         match  config. db_config  { 
58+             DbConfig :: Primary  { }  => { 
59+                 let  db = LibsqlDatabase :: new_primary ( path,  Compactor ,  false ) . unwrap ( ) ; 
60+                 Self :: Primary ( db) 
61+             } 
62+             DbConfig :: Replica  {  .. }  => todo ! ( ) , 
63+         } 
64+     } 
65+ 
3166    fn  connect ( & self )  -> Box < dyn  libsqlx:: Connection  + Send >  { 
32-         todo ! ( ) ; 
67+         match  self  { 
68+             Database :: Primary ( db)  => Box :: new ( db. connect ( ) . unwrap ( ) ) , 
69+         } 
3370    } 
3471} 
3572
3673pub  struct  Allocation  { 
37-     inbox :  mpsc:: Receiver < AllocationMessage > , 
38-     database :  Database , 
74+     pub   inbox :  mpsc:: Receiver < AllocationMessage > , 
75+     pub   database :  Database , 
3976    /// senders to the spawned connections 
40- connections :  HashMap < u32 ,  mpsc:: Sender < ExecFn > > , 
77+ pub   connections :  HashMap < u32 ,  mpsc:: Sender < ExecFn > > , 
4178    /// spawned connection futures, returning their connection id on completion. 
42- connections_futs :  JoinSet < u32 > , 
43-     next_conn_id :  u32 , 
44-     max_concurrent_connections :  u32 , 
79+ pub   connections_futs :  JoinSet < u32 > , 
80+     pub   next_conn_id :  u32 , 
81+     pub   max_concurrent_connections :  u32 , 
4582} 
4683
4784impl  Allocation  { 
48-     async  fn  run ( mut  self )  { 
85+     pub   async  fn  run ( mut  self )  { 
4986        loop  { 
5087            tokio:: select! { 
5188                Some ( msg)  = self . inbox. recv( )  => { 
@@ -86,23 +123,19 @@ impl Allocation {
86123            exec :  exec_receiver, 
87124        } ; 
88125
89- 
90126        self . connections_futs . spawn ( conn. run ( ) ) ; 
91127        // This should never block! 
92128        assert ! ( exec_sender. try_send( exec) . is_ok( ) ) ; 
93129        assert ! ( self . connections. insert( id,  exec_sender) . is_none( ) ) ; 
94130
95-         ConnectionId  { 
96-             id, 
97-             close_sender, 
98-         } 
131+         ConnectionId  {  id,  close_sender } 
99132    } 
100133
101134    fn  next_conn_id ( & mut  self )  -> u32  { 
102135        loop  { 
103136            self . next_conn_id  = self . next_conn_id . wrapping_add ( 1 ) ; 
104137            if  !self . connections . contains_key ( & self . next_conn_id )  { 
105-                 return  self . next_conn_id 
138+                 return  self . next_conn_id ; 
106139            } 
107140        } 
108141    } 
0 commit comments