@@ -93,7 +93,8 @@ all_tests() ->
9393 init_partial_writes ,
9494 init_with_unexpected_file ,
9595 overview_with_missing_segment ,
96- overview_with_missing_index_at_start
96+ overview_with_missing_index_at_start ,
97+ read_ahead
9798 % read_header_ahead_offset_reader,
9899 % read_header_ahead_offset_reader_filter
99100 ].
@@ -385,6 +386,83 @@ iterator_read_chunk_mixed_sizes_with_credit(Config) ->
385386 osiris_log :close (S1 ),
386387 ok .
387388
389+ read_ahead (Config ) ->
390+ RAL = 4096 , % % read ahead limit
391+ HS = ? HEADER_SIZE_B ,
392+ Conf = ? config (osiris_conf , Config ),
393+ W = osiris_log :init (Conf ),
394+ Shared = osiris_log :get_shared (W ),
395+ RConf = Conf #{shared => Shared , transport => tcp },
396+ {ok , R } = osiris_log :init_offset_reader (0 , RConf ),
397+
398+ % % server, we will read stream data from this socket
399+ {ok , SLS } = gen_tcp :listen (0 , [binary , {packet , 0 },
400+ {active , false }]),
401+ {ok , Port } = inet :port (SLS ),
402+
403+ % % client, osiris will send to this socket
404+ {ok , CS } = gen_tcp :connect (" localhost" , Port ,
405+ [binary , {packet , 0 }]),
406+
407+ {ok , SS } = gen_tcp :accept (SLS ),
408+
409+ Tests =
410+ [
411+ fun (write , #{w := W0 }) ->
412+ Entries = [<<" hiiiiiiiii" >>, <<" hooooooo" >>],
413+ {_ , W1 } = write_committed (Entries , W0 ),
414+ W1 ;
415+ (read , #{r := R0 , tracer := T }) ->
416+ % % first chunk, we read the header and the chunk
417+ Entries = [<<" hiiiiiiiii" >>, <<" hooooooo" >>],
418+ [_ , _ , D , _ ] = fake_chunk (Entries , ? LINE , 1 , 100 ),
419+ {ok , R1 } = osiris_log :send_file (CS , R0 ),
420+ {ok , Read } = recv (SS , byte_size (iolist_to_binary (D )) + HS ),
421+ ? assertEqual (iolist_to_binary (D ), binary :part (Read , HS , byte_size (Read ) - HS )),
422+ ? assertEqual (2 , length (osiris_tracer :calls (T ))),
423+ R1
424+ end ,
425+ fun (write , #{w := W0 }) ->
426+ {_ , W1 } = write_committed ([<<" hi" >>, <<" ho" >>], W0 ),
427+ W1 ;
428+ (read , #{r := R0 , tracer := T }) ->
429+ % % small chunk, we read it ahead already
430+ [_ , _ , D , _ ] = fake_chunk ([<<" hi" >>, <<" ho" >>], ? LINE , 1 , 100 ),
431+ {ok , R1 } = osiris_log :send_file (CS , R0 ),
432+ {ok , Read } = recv (SS , byte_size (iolist_to_binary (D )) + HS ),
433+ ? assertEqual (iolist_to_binary (D ), binary :part (Read , HS , byte_size (Read ) - HS )),
434+ ? assertEqual (0 , length (osiris_tracer :calls (T ))),
435+ R1
436+ end ,
437+ fun (write , #{w := W0 }) ->
438+ Entries = [<<" foo" >>, binary :copy (<<" b" >>, RAL * 2 )],
439+ {_ , W1 } = write_committed (Entries , W0 ),
440+ W1 ;
441+ (read , #{r := R0 , tracer := T }) ->
442+ % % large chunk, we will need to read from the file system
443+ Entries = [<<" foo" >>, binary :copy (<<" b" >>, RAL * 2 )],
444+ [_ , _ , D , _ ] = fake_chunk (Entries , ? LINE , 1 , 100 ),
445+ {ok , R1 } = osiris_log :send_file (CS , R0 ),
446+ {ok , Read } = recv (SS , byte_size (iolist_to_binary (D )) + HS ),
447+ ? assertEqual (iolist_to_binary (D ), binary :part (Read , HS , byte_size (Read ) - HS )),
448+ ? assertEqual (0 , length (osiris_tracer :calls (T , file , pread ))),
449+ ? assertEqual (1 , length (osiris_tracer :calls (T , file , sendfile ))),
450+ R1
451+ end
452+ ],
453+
454+ #{w := W1 , r := R1 } = run_read_ahead_tests (Tests , offset ,
455+ ? DEFAULT_FILTER_SIZE , W , R ),
456+
457+ ok = gen_tcp :close (CS ),
458+
459+ ok = gen_tcp :close (SS ),
460+ ok = gen_tcp :close (SLS ),
461+
462+ osiris_log :close (R1 ),
463+ osiris_log :close (W1 ),
464+ ok .
465+
388466iterator_read_chunk_with_read_ahead (Config ) ->
389467 % % the test makes sure reading ahead on header reading does not break
390468 % % the iterator
@@ -402,25 +480,25 @@ iterator_read_chunk_with_read_ahead(Config) ->
402480 W1 ;
403481 (read , #{r := R0 , tracer := T }) ->
404482 % % first chunk, there won't be any data size hints in the reader
405- {ok , H , I0 , R1 } = osiris_log :chunk_iterator (R0 ),
483+ {ok , _ , I0 , R1 } = osiris_log :chunk_iterator (R0 ),
406484 {{_ , <<" ho" >>}, I1 } = osiris_log :iterator_next (I0 ),
407485 {{_ , <<" hi" >>}, I2 } = osiris_log :iterator_next (I1 ),
408486 ? assertMatch (end_of_chunk , osiris_log :iterator_next (I2 )),
409487 ? assertEqual (2 , length (osiris_tracer :calls (T ))),
410- { H , R1 }
488+ R1
411489 end ,
412490 fun (write , #{w := W0 }) ->
413491 % % this one will be read ahead
414492 EntriesRev = [<<" foo" >>, <<" bar" >>],
415493 {_ , W1 } = write_committed (EntriesRev , W0 ),
416494 W1 ;
417495 (read , #{r := R0 , tracer := T }) ->
418- {ok , H , I0 , R1 } = osiris_log :chunk_iterator (R0 ),
496+ {ok , _ , I0 , R1 } = osiris_log :chunk_iterator (R0 ),
419497 {{_ , <<" bar" >>}, I1 } = osiris_log :iterator_next (I0 ),
420498 {{_ , <<" foo" >>}, I2 } = osiris_log :iterator_next (I1 ),
421499 ? assertMatch (end_of_chunk , osiris_log :iterator_next (I2 )),
422500 ? assertEqual (0 , length (osiris_tracer :calls (T ))),
423- { H , R1 }
501+ R1
424502 end ,
425503 fun (write , #{w := W0 }) ->
426504 % % this one will be read ahead
@@ -430,12 +508,12 @@ iterator_read_chunk_with_read_ahead(Config) ->
430508 W1 ;
431509 (read , #{r := R0 , tracer := T }) ->
432510 E1 = binary :copy (<<" b" >>, RAL - 200 ),
433- {ok , H , I0 , R1 } = osiris_log :chunk_iterator (R0 ),
511+ {ok , _ , I0 , R1 } = osiris_log :chunk_iterator (R0 ),
434512 {{_ , <<" aaa" >>}, I1 } = osiris_log :iterator_next (I0 ),
435513 {{_ , E1 }, I2 } = osiris_log :iterator_next (I1 ),
436514 ? assertMatch (end_of_chunk , osiris_log :iterator_next (I2 )),
437515 ? assertEqual (0 , length (osiris_tracer :calls (T ))),
438- { H , R1 }
516+ R1
439517 end ,
440518 fun (write , #{w := W0 }) ->
441519 % % this one is too big to be read ahead
@@ -445,12 +523,12 @@ iterator_read_chunk_with_read_ahead(Config) ->
445523 W1 ;
446524 (read , #{r := R0 , tracer := T }) ->
447525 E1 = binary :copy (<<" b" >>, RAL * 2 ),
448- {ok , H , I0 , R1 } = osiris_log :chunk_iterator (R0 ),
526+ {ok , _ , I0 , R1 } = osiris_log :chunk_iterator (R0 ),
449527 {{_ , <<" aaa" >>}, I1 } = osiris_log :iterator_next (I0 ),
450528 {{_ , E1 }, I2 } = osiris_log :iterator_next (I1 ),
451529 ? assertMatch (end_of_chunk , osiris_log :iterator_next (I2 )),
452530 ? assertEqual (2 , length (osiris_tracer :calls (T ))),
453- { H , R1 }
531+ R1
454532 end
455533 ],
456534
@@ -2191,20 +2269,14 @@ run_read_ahead_tests(Tests, RType, FSize, Wr0, Rd0) ->
21912269 #{w => W }
21922270 end , #{w => Wr0 }, Tests ),
21932271 R = lists :foldl (fun (F , Acc ) ->
2194- Tracer = osiris_tracer :start ({file , pread , '_' }),
2195- {_ , R0 } = F (read , Acc #{tracer => Tracer }),
2272+ Tracer = osiris_tracer :start ([{file , pread , '_' },
2273+ {file , sendfile , '_' }]),
2274+ R0 = F (read , Acc #{tracer => Tracer }),
21962275 osiris_tracer :stop (Tracer ),
2197- % R1 = update_read(H, R0),
21982276 #{r => R0 , rtype => RType , fsize => FSize }
21992277 end , #{r => Rd0 , rtype => RType , fsize => FSize }, Tests ),
22002278 maps :merge (W , R ).
22012279
2202- - spec update_read (map (), osiris_log :state ()) -> osiris_log :state ().
2203- update_read (#{chunk_id := ChId ,
2204- num_records := NumRecords ,
2205- next_position := NextPos }, R ) ->
2206- osiris_log :update_read (R , ChId , NumRecords , NextPos ).
2207-
22082280truncate_at (File , Pos ) ->
22092281 {ok , Fd } = file :open (File , [raw , binary , read , write ]),
22102282 % truncate the file so that chunk <<four>> is missing and <<three>> is corrupted
@@ -2311,3 +2383,19 @@ fake_chunk(Blobs, Ts, Epoch, NextChId, FSize) ->
23112383 element (1 ,
23122384 osiris_log :make_chunk (Blobs , <<>>, 0 , Ts , Epoch , NextChId ,
23132385 FSize )).
2386+
2387+ recv (Socket , Expected ) ->
2388+ recv (Socket , Expected , <<>>).
2389+
2390+ recv (_Socket , 0 , Acc ) ->
2391+ Acc ;
2392+ recv (Socket , Expected , Acc ) ->
2393+ case gen_tcp :recv (Socket , Expected , 10_000 ) of
2394+ {ok , Data } when byte_size (Data ) == Expected ->
2395+ {ok , Data };
2396+ {ok , Data } when byte_size (Data ) < Expected ->
2397+ {ok , recv (Socket , Expected - byte_size (Data ),
2398+ <<Acc /binary , Data /binary >>)};
2399+ Other ->
2400+ Other
2401+ end .
0 commit comments