431431 chunk_selector :: all | user_data ,
432432 position = 0 :: non_neg_integer (),
433433 filter :: undefined | osiris_bloom :mstate (),
434- last_data_size = undefined :: undefined | non_neg_integer ()}).
434+ last_data_size = undefined :: undefined | non_neg_integer (),
435+ read_ahead_data = undefined :: undefined | binary ()}).
435436-record (write ,
436437 {type = writer :: writer | acceptor ,
437438 segment_size = {? LOG_HEADER_SIZE , 0 } :: {non_neg_integer (), non_neg_integer ()},
@@ -2899,11 +2900,10 @@ read_header0(State) ->
28992900
29002901do_read_header (#? MODULE {cfg = # cfg {directory = Dir ,
29012902 shared = Shared },
2902- mode = # read {type = RType ,
2903- chunk_selector = Selector ,
2904- next_offset = NextChId0 ,
2903+ mode = # read {next_offset = NextChId0 ,
29052904 position = Pos ,
2906- last_data_size = Lds } = Read0 ,
2905+ last_data_size = Lds ,
2906+ read_ahead_data = undefined } = Read0 ,
29072907 current_file = CurFile ,
29082908 fd = Fd } = State ) ->
29092909 ReadAheadOffset = read_ahead_offset (Lds ),
@@ -2924,26 +2924,17 @@ do_read_header(#?MODULE{cfg = #cfg{directory = Dir,
29242924 _Reserved :24 ,
29252925 MaybeFilterAndRest /binary >> = HeaderData0 }
29262926 when ? CAN_READ_AHEAD (Lds ) ->
2927- {ToSkip , ToSend } = select_amount_to_send (RType , Selector , ChType ,
2928- FilterSize , DataSize ,
2929- TrailerSize ),
2930- % % summing what we need to skip and send gives us the number
2931- % % of bytes we need to read, we make sure we read it ahead
2932- Content = case ReadAheadOffset of
2933- RAO when RAO >= (ToSkip + ToSend ) ->
2934- % % we read everything we needed
2935- <<_Skip :ToSkip /binary ,
2936- Ctnt :ToSend /binary ,
2937- _Rest /binary >> = MaybeFilterAndRest ,
2938- Ctnt ;
2939- _ ->
2940- % % we did not read enough, the caller will have to do it
2941- undefined
2942- end ,
2943- maybe_return_header (State , HeaderData0 , MaybeFilterAndRest , Content ,
2944- ChType , NumEntries , NumRecords , Timestamp ,
2945- Epoch , NextChId0 , Crc , DataSize , TrailerSize ,
2946- FilterSize );
2927+ case read_ahead_chunk (HeaderData0 , State ) of
2928+ need_more_data ->
2929+ Read1 = Read0 # read {read_ahead_data = undefined },
2930+ maybe_return_header (State #? MODULE {mode = Read1 },
2931+ HeaderData0 , MaybeFilterAndRest , undefined ,
2932+ ChType , NumEntries , NumRecords , Timestamp ,
2933+ Epoch , NextChId0 , Crc , DataSize , TrailerSize ,
2934+ FilterSize );
2935+ R ->
2936+ R
2937+ end ;
29472938 {ok , <<? MAGIC :4 /unsigned ,
29482939 ? VERSION :4 /unsigned ,
29492940 ChType :8 /unsigned ,
@@ -3014,8 +3005,64 @@ do_read_header(#?MODULE{cfg = #cfg{directory = Dir,
30143005 {error , {unexpected_chunk_id , UnexpectedChId , NextChId0 }};
30153006 Invalid ->
30163007 {error , {invalid_chunk_header , Invalid }}
3008+ end ;
3009+ do_read_header (#? MODULE {mode = # read {read_ahead_data = RAD } = Read0 } = State ) ->
3010+ case read_ahead_chunk (RAD , State ) of
3011+ need_more_data ->
3012+ % % we don't have enough data in memory
3013+ Read1 = Read0 # read {read_ahead_data = undefined },
3014+ do_read_header (State #? MODULE {mode = Read1 });
3015+ R ->
3016+ R
30173017 end .
30183018
3019+ read_ahead_chunk (<<? MAGIC :4 /unsigned ,
3020+ ? VERSION :4 /unsigned ,
3021+ ChType :8 /unsigned ,
3022+ NumEntries :16 /unsigned ,
3023+ NumRecords :32 /unsigned ,
3024+ Timestamp :64 /signed ,
3025+ Epoch :64 /unsigned ,
3026+ NextChId0 :64 /unsigned ,
3027+ Crc :32 /integer ,
3028+ DataSize :32 /unsigned ,
3029+ TrailerSize :32 /unsigned ,
3030+ FilterSize :8 /unsigned ,
3031+ _Reserved :24 ,
3032+ MaybeFilterAndRest /binary >> = HeaderData0 ,
3033+ #? MODULE {mode = # read {type = RType ,
3034+ chunk_selector = Selector ,
3035+ next_offset = NextChId0 } = Read0 } = State ) ->
3036+ {ToSkip , ToSend } = select_amount_to_send (RType , Selector , ChType ,
3037+ FilterSize , DataSize ,
3038+ TrailerSize ),
3039+ case byte_size (MaybeFilterAndRest ) of
3040+ RAS when RAS >= (ToSkip + ToSend ) ->
3041+ % % we read everything we needed
3042+ {ReadAheadData , Content } =
3043+ case MaybeFilterAndRest of
3044+ <<_Skip :ToSkip /binary ,
3045+ Ctnt :ToSend /binary ,
3046+ Rest /binary >>
3047+ when byte_size (Rest ) > ? HEADER_SIZE_B + ? DEFAULT_FILTER_SIZE ->
3048+ {Rest , Ctnt };
3049+ <<_Skip :ToSkip /binary ,
3050+ Ctnt :ToSend /binary ,
3051+ _Rest /binary >> ->
3052+ {undefined , Ctnt }
3053+ end ,
3054+ Read1 = Read0 # read {read_ahead_data = ReadAheadData },
3055+ maybe_return_header (State #? MODULE {mode = Read1 },
3056+ HeaderData0 , MaybeFilterAndRest , Content ,
3057+ ChType , NumEntries , NumRecords , Timestamp ,
3058+ Epoch , NextChId0 , Crc , DataSize , TrailerSize ,
3059+ FilterSize );
3060+ _ ->
3061+ need_more_data
3062+ end ;
3063+ read_ahead_chunk (_ , _ ) ->
3064+ need_more_data .
3065+
30193066read_ahead_offset (LastDataSize ) ->
30203067 case LastDataSize of
30213068 LastDataSize when ? CAN_READ_AHEAD (LastDataSize ) ->
0 commit comments