1+ use  std:: collections:: HashMap ; 
2+ use  std:: collections:: hash_map:: Entry ; 
13use  std:: path:: PathBuf ; 
24use  std:: sync:: Arc ; 
35use  std:: time:: { Duration ,  Instant } ; 
46
7+ use  bytes:: Bytes ; 
58use  libsqlx:: libsql:: { LibsqlDatabase ,  LogCompactor ,  LogFile ,  PrimaryType ,  ReplicaType } ; 
69use  libsqlx:: proxy:: WriteProxyDatabase ; 
7- use  libsqlx:: { Database  as  _,  DescribeResponse ,  Frame ,  InjectableDatabase ,  Injector ,  FrameNo } ; 
10+ use  libsqlx:: { 
11+     Database  as  _,  DescribeResponse ,  Frame ,  FrameNo ,  InjectableDatabase ,  Injector ,  LogReadError , 
12+     ReplicationLogger , 
13+ } ; 
814use  tokio:: sync:: { mpsc,  oneshot} ; 
915use  tokio:: task:: { block_in_place,  JoinSet } ; 
1016use  tokio:: time:: timeout; 
@@ -13,7 +19,7 @@ use crate::hrana;
1319use  crate :: hrana:: http:: handle_pipeline; 
1420use  crate :: hrana:: http:: proto:: { PipelineRequestBody ,  PipelineResponseBody } ; 
1521use  crate :: linc:: bus:: Dispatch ; 
16- use  crate :: linc:: proto:: { Enveloppe ,  Message ,   Frames } ; 
22+ use  crate :: linc:: proto:: { Enveloppe ,  Frames ,   Message } ; 
1723use  crate :: linc:: { Inbound ,  NodeId ,  Outbound } ; 
1824use  crate :: meta:: DatabaseId ; 
1925
@@ -65,7 +71,11 @@ impl libsqlx::Database for DummyDb {
6571type  ProxyDatabase  = WriteProxyDatabase < LibsqlDatabase < ReplicaType > ,  DummyDb > ; 
6672
6773pub  enum  Database  { 
68-     Primary ( LibsqlDatabase < PrimaryType > ) , 
74+     Primary  { 
75+         db :  LibsqlDatabase < PrimaryType > , 
76+         replica_streams :  HashMap < NodeId ,  ( u32 ,  tokio:: task:: JoinHandle < ( ) > ) > , 
77+         frame_notifier :  tokio:: sync:: watch:: Receiver < FrameNo > , 
78+     } , 
6979    Replica  { 
7080        db :  ProxyDatabase , 
7181        injector_handle :  mpsc:: Sender < Frames > , 
@@ -96,7 +106,7 @@ const MAX_INJECTOR_BUFFER_CAP: usize = 32;
96106struct  Replicator  { 
97107    dispatcher :  Arc < dyn  Dispatch > , 
98108    req_id :  u32 , 
99-     last_committed :  FrameNo , 
109+     next_frame_no :  FrameNo , 
100110    next_seq :  u32 , 
101111    database_id :  DatabaseId , 
102112    primary_node_id :  NodeId , 
@@ -106,30 +116,36 @@ struct Replicator {
106116
107117impl  Replicator  { 
108118    async  fn  run ( mut  self )  { 
109-         dbg ! ( ) ; 
110119        self . query_replicate ( ) . await ; 
111-         dbg ! ( ) ; 
112120        loop  { 
113121            match  timeout ( Duration :: from_secs ( 5 ) ,  self . receiver . recv ( ) ) . await  { 
114122                Ok ( Some ( Frames  { 
115-                     req_id, 
116-                     seq, 
123+                     req_no :   req_id, 
124+                     seq_no :   seq, 
117125                    frames, 
118126                } ) )  => { 
119127                    // ignore frames from a previous call to Replicate 
120-                     if  req_id != self . req_id  {  continue  } 
121-                     if  seq != self . next_seq  {  
128+                     if  req_id != self . req_id  { 
129+                         tracing:: debug!( req_id,  self . req_id,  "wrong req_id" ) ; 
130+                         continue ; 
131+                     } 
132+                     if  seq != self . next_seq  { 
122133                        // this is not the batch of frame we were expecting, drop what we have, and 
123134                        // ask again from last checkpoint 
135+                         tracing:: debug!( seq,  self . next_seq,  "wrong seq" ) ; 
124136                        self . query_replicate ( ) . await ; 
125137                        continue ; 
126138                    } ; 
127139                    self . next_seq  += 1 ; 
140+ 
141+                     tracing:: debug!( "injecting {} frames" ,  frames. len( ) ) ; 
142+ 
128143                    for  bytes in  frames { 
129144                        let  frame = Frame :: try_from_bytes ( bytes) . unwrap ( ) ; 
130145                        block_in_place ( || { 
131146                            if  let  Some ( last_committed)  = self . injector . inject ( frame) . unwrap ( )  { 
132-                                 self . last_committed  = last_committed; 
147+                                 tracing:: debug!( last_committed) ; 
148+                                 self . next_frame_no  = last_committed + 1 ; 
133149                            } 
134150                        } ) ; 
135151                    } 
@@ -151,22 +167,93 @@ impl Replicator {
151167                enveloppe :  Enveloppe  { 
152168                    database_id :  Some ( self . database_id ) , 
153169                    message :  Message :: Replicate  { 
154-                         next_frame_no :  self . last_committed  +  1 , 
155-                         req_id :  self . req_id  -  1 , 
170+                         next_frame_no :  self . next_frame_no , 
171+                         req_no :  self . req_id , 
156172                    } , 
157173                } , 
158174            } ) 
159-         . await ; 
175+             . await ; 
176+     } 
177+ } 
178+ 
179+ struct  FrameStreamer  { 
180+     logger :  Arc < ReplicationLogger > , 
181+     database_id :  DatabaseId , 
182+     node_id :  NodeId , 
183+     next_frame_no :  FrameNo , 
184+     req_no :  u32 , 
185+     seq_no :  u32 , 
186+     dipatcher :  Arc < dyn  Dispatch > , 
187+     notifier :  tokio:: sync:: watch:: Receiver < FrameNo > , 
188+     buffer :  Vec < Bytes > , 
189+ } 
190+ 
191+ // the maximum number of frame a Frame messahe is allowed to contain 
192+ const  FRAMES_MESSAGE_MAX_COUNT :  usize  = 5 ; 
193+ 
194+ impl  FrameStreamer  { 
195+     async  fn  run ( mut  self )  { 
196+         loop  { 
197+             match  block_in_place ( || self . logger . get_frame ( self . next_frame_no ) )  { 
198+                 Ok ( frame)  => { 
199+                     if  self . buffer . len ( )  > FRAMES_MESSAGE_MAX_COUNT  { 
200+                         self . send_frames ( ) . await ; 
201+                     } 
202+                     self . buffer . push ( frame. bytes ( ) ) ; 
203+                     self . next_frame_no  += 1 ; 
204+                 } 
205+                 Err ( LogReadError :: Ahead )  => { 
206+                     tracing:: debug!( "frame {} not yet avaiblable" ,  self . next_frame_no) ; 
207+                     if  !self . buffer . is_empty ( )  { 
208+                         self . send_frames ( ) . await ; 
209+                     } 
210+                     if  self . notifier . wait_for ( |fno| dbg ! ( * fno)  >= self . next_frame_no ) . await . is_err ( )  { 
211+                         break ; 
212+                     } 
213+                 } 
214+                 Err ( LogReadError :: Error ( _) )  => todo ! ( "handle log read error" ) , 
215+                 Err ( LogReadError :: SnapshotRequired )  => todo ! ( "handle reading from snapshot" ) , 
216+             } 
217+         } 
218+     } 
219+ 
220+     async  fn  send_frames ( & mut  self )  { 
221+         let  frames = std:: mem:: take ( & mut  self . buffer ) ; 
222+         let  outbound = Outbound  { 
223+             to :  self . node_id , 
224+             enveloppe :  Enveloppe  { 
225+                 database_id :  Some ( self . database_id ) , 
226+                 message :  Message :: Frames ( Frames  { 
227+                     req_no :  self . req_no , 
228+                     seq_no :  self . seq_no , 
229+                     frames, 
230+                 } ) , 
231+             } , 
232+         } ; 
233+         self . seq_no  += 1 ; 
234+         self . dipatcher . dispatch ( outbound) . await ; 
160235    } 
161236} 
162237
163238impl  Database  { 
164239    pub  fn  from_config ( config :  & AllocConfig ,  path :  PathBuf ,  dispatcher :  Arc < dyn  Dispatch > )  -> Self  { 
165240        match  config. db_config  { 
166241            DbConfig :: Primary  { }  => { 
167-                 let  db = LibsqlDatabase :: new_primary ( path,  Compactor ,  false ) . unwrap ( ) ; 
168-                 Self :: Primary ( db) 
169-             } 
242+                 let  ( sender,  receiver)  = tokio:: sync:: watch:: channel ( 0 ) ; 
243+                 let  db = LibsqlDatabase :: new_primary ( 
244+                     path, 
245+                     Compactor , 
246+                     false , 
247+                     Box :: new ( move  |fno| {  let  _ = sender. send ( fno) ;  }  ) , 
248+                 ) 
249+                 . unwrap ( ) ; 
250+ 
251+                 Self :: Primary  { 
252+                     db, 
253+                     replica_streams :  HashMap :: new ( ) , 
254+                     frame_notifier :  receiver, 
255+                 } 
256+             } , 
170257            DbConfig :: Replica  {  primary_node_id }  => { 
171258                let  rdb = LibsqlDatabase :: new_replica ( path,  MAX_INJECTOR_BUFFER_CAP ,  ( ) ) . unwrap ( ) ; 
172259                let  wdb = DummyDb ; 
@@ -178,7 +265,7 @@ impl Database {
178265                let  replicator = Replicator  { 
179266                    dispatcher, 
180267                    req_id :  0 , 
181-                     last_committed :  0 ,  // TODO: load the last commited from meta file 
268+                     next_frame_no :  0 ,  // TODO: load the last commited from meta file 
182269                    next_seq :  0 , 
183270                    database_id, 
184271                    primary_node_id, 
@@ -200,7 +287,7 @@ impl Database {
200287
201288    fn  connect ( & self )  -> Box < dyn  libsqlx:: Connection  + Send >  { 
202289        match  self  { 
203-             Database :: Primary ( db )  => Box :: new ( db. connect ( ) . unwrap ( ) ) , 
290+             Database :: Primary   {  db ,  ..  }  => Box :: new ( db. connect ( ) . unwrap ( ) ) , 
204291            Database :: Replica  {  db,  .. }  => Box :: new ( db. connect ( ) . unwrap ( ) ) , 
205292        } 
206293    } 
@@ -281,12 +368,44 @@ impl Allocation {
281368        ) ; 
282369
283370        match  msg. enveloppe . message  { 
284-             Message :: Handshake  {  .. }  => todo ! ( ) , 
371+             Message :: Handshake  {  .. }  => unreachable ! ( "handshake should have been caught earlier" ) , 
285372            Message :: ReplicationHandshake  {  .. }  => todo ! ( ) , 
286373            Message :: ReplicationHandshakeResponse  {  .. }  => todo ! ( ) , 
287-             Message :: Replicate  {  .. }  => match  & mut  self . database  { 
288-                 Database :: Primary ( _)  => todo ! ( ) , 
289-                 Database :: Replica  {  .. }  => ( ) , 
374+             Message :: Replicate  {  req_no,  next_frame_no }  => match  & mut  self . database  { 
375+                 Database :: Primary  {  db,  replica_streams,  frame_notifier }  => { 
376+                     dbg ! ( next_frame_no) ; 
377+                     let  streamer = FrameStreamer  { 
378+                         logger :  db. logger ( ) , 
379+                         database_id :  DatabaseId :: from_name ( & self . db_name ) , 
380+                         node_id :  msg. from , 
381+                         next_frame_no, 
382+                         req_no, 
383+                         seq_no :  0 , 
384+                         dipatcher :  self . dispatcher . clone ( ) , 
385+                         notifier :  frame_notifier. clone ( ) , 
386+                         buffer :  Vec :: new ( ) , 
387+                     } ; 
388+ 
389+                     match  replica_streams. entry ( msg. from )  { 
390+                         Entry :: Occupied ( mut  e)  => { 
391+                             let  ( old_req_no,  old_handle)  = e. get_mut ( ) ; 
392+                             // ignore req_no older that the current req_no 
393+                             if  * old_req_no < req_no { 
394+                                 let  handle = tokio:: spawn ( streamer. run ( ) ) ; 
395+                                 let  old_handle = std:: mem:: replace ( old_handle,  handle) ; 
396+                                 * old_req_no = req_no; 
397+                                 old_handle. abort ( ) ; 
398+                             } 
399+                         } , 
400+                         Entry :: Vacant ( e)  => { 
401+                             let  handle = tokio:: spawn ( streamer. run ( ) ) ; 
402+                             // For some reason, not yielding causes the task not to be spawned 
403+                             tokio:: task:: yield_now ( ) . await ; 
404+                             e. insert ( ( req_no,  handle) ) ; 
405+                         } , 
406+                     } 
407+                 } , 
408+                 Database :: Replica  {  .. }  => todo ! ( "not a primary!" ) , 
290409            } , 
291410            Message :: Frames ( frames)  => match  & mut  self . database  { 
292411                Database :: Replica  { 
@@ -297,7 +416,7 @@ impl Allocation {
297416                    * last_received_frame_ts = Some ( Instant :: now ( ) ) ; 
298417                    injector_handle. send ( frames) . await . unwrap ( ) ; 
299418                } 
300-                 Database :: Primary ( _ )  => todo ! ( "handle primary receiving txn" ) , 
419+                 Database :: Primary   {  ..  }  => todo ! ( "handle primary receiving txn" ) , 
301420            } , 
302421            Message :: ProxyRequest  {  .. }  => todo ! ( ) , 
303422            Message :: ProxyResponse  {  .. }  => todo ! ( ) , 
0 commit comments