108108
109109-define (SKIP_SEARCH_JUMP , 2048 ).
110110-define (READ_AHEAD_LIMIT , 4096 ).
111- -define (CAN_READ_AHEAD (LastDataSize ), LastDataSize =/= undefined andalso
112- LastDataSize =< ? READ_AHEAD_LIMIT ).
113111
114112% % Specification of the Log format.
115113% %
@@ -2895,66 +2893,24 @@ read_header0(State) ->
28952893 % % reads the next header if permitted
28962894 case can_read_next (State ) of
28972895 true ->
2898- do_read_header (State );
2896+ read_header_with_ra (State );
28992897 false ->
29002898 {end_of_stream , State }
29012899 end .
29022900
2903- do_read_header (#? MODULE {cfg = # cfg {directory = Dir ,
2904- shared = Shared },
2905- mode = # read {next_offset = NextChId0 ,
2906- position = Pos ,
2907- last_data_size = Lds ,
2908- read_ahead_data = undefined } = Read0 ,
2909- current_file = CurFile ,
2910- fd = Fd } = State ) ->
2911- ReadAheadOffset = read_ahead_offset (Lds ),
2912-
2913- case file :pread (Fd , Pos , ? HEADER_SIZE_B + ReadAheadOffset ) of
2914- {ok , <<? MAGIC :4 /unsigned ,
2915- ? VERSION :4 /unsigned ,
2916- ChType :8 /unsigned ,
2917- NumEntries :16 /unsigned ,
2918- NumRecords :32 /unsigned ,
2919- Timestamp :64 /signed ,
2920- Epoch :64 /unsigned ,
2921- NextChId0 :64 /unsigned ,
2922- Crc :32 /integer ,
2923- DataSize :32 /unsigned ,
2924- TrailerSize :32 /unsigned ,
2925- FilterSize :8 /unsigned ,
2926- _Reserved :24 ,
2927- MaybeFilterAndRest /binary >> = HeaderData0 }
2928- when ? CAN_READ_AHEAD (Lds ) ->
2929- case read_ahead_chunk (HeaderData0 , State ) of
2930- need_more_data ->
2931- Read1 = Read0 # read {read_ahead_data = undefined },
2932- maybe_return_header (State #? MODULE {mode = Read1 },
2933- HeaderData0 , MaybeFilterAndRest , undefined ,
2934- ChType , NumEntries , NumRecords , Timestamp ,
2935- Epoch , NextChId0 , Crc , DataSize , TrailerSize ,
2936- FilterSize );
2937- R ->
2938- R
2939- end ;
2940- {ok , <<? MAGIC :4 /unsigned ,
2941- ? VERSION :4 /unsigned ,
2942- ChType :8 /unsigned ,
2943- NumEntries :16 /unsigned ,
2944- NumRecords :32 /unsigned ,
2945- Timestamp :64 /signed ,
2946- Epoch :64 /unsigned ,
2947- NextChId0 :64 /unsigned ,
2948- Crc :32 /integer ,
2949- DataSize :32 /unsigned ,
2950- TrailerSize :32 /unsigned ,
2951- FilterSize :8 /unsigned ,
2952- _Reserved :24 ,
2953- MaybeFilter /binary >> = HeaderData0 } ->
2954- maybe_return_header (State , HeaderData0 , MaybeFilter , undefined ,
2955- ChType , NumEntries , NumRecords , Timestamp ,
2956- Epoch , NextChId0 , Crc , DataSize ,
2957- TrailerSize , FilterSize );
2901+ read_header_with_ra (#? MODULE {cfg = # cfg {directory = Dir ,
2902+ shared = Shared },
2903+ mode = # read {next_offset = NextChId0 ,
2904+ position = Pos ,
2905+ last_data_size = Lds ,
2906+ read_ahead_data = undefined } = Read0 ,
2907+ current_file = CurFile ,
2908+ fd = Fd } = State ) ->
2909+ ReadAheadSize = read_ahead_size (Lds ),
2910+
2911+ case file :pread (Fd , Pos , ? HEADER_SIZE_B + ReadAheadSize ) of
2912+ {ok , Bin } when byte_size (Bin ) >= ? HEADER_SIZE_B ->
2913+ parse_header (Bin , State );
29582914 {ok , Bin } when byte_size (Bin ) < ? HEADER_SIZE_B ->
29592915 % % partial header read
29602916 % % this can happen when a replica reader reads ahead
@@ -3008,45 +2964,55 @@ do_read_header(#?MODULE{cfg = #cfg{directory = Dir,
30082964 Invalid ->
30092965 {error , {invalid_chunk_header , Invalid }}
30102966 end ;
3011- do_read_header (#? MODULE {mode = # read {read_ahead_data = RAD } = Read0 } = State ) ->
3012- case read_ahead_chunk (RAD , State ) of
3013- need_more_data ->
3014- % % we don't have enough data in memory
2967+ read_header_with_ra (#? MODULE {mode = # read {last_data_size = Lds ,
2968+ read_ahead_data = RAD } = Read0 } = State ) ->
2969+ case byte_size (RAD ) > ? HEADER_SIZE_B + Lds of
2970+ true ->
2971+ case parse_header (RAD , State ) of
2972+ need_more_data ->
2973+ % % we don't have enough data in memory
2974+ Read1 = Read0 # read {read_ahead_data = undefined },
2975+ read_header_with_ra (State #? MODULE {mode = Read1 });
2976+ Result ->
2977+ Result
2978+ end ;
2979+ false ->
30152980 Read1 = Read0 # read {read_ahead_data = undefined },
3016- do_read_header (State #? MODULE {mode = Read1 });
3017- R ->
3018- R
2981+ read_header_with_ra (State #? MODULE {mode = Read1 })
30192982 end .
30202983
3021- read_ahead_chunk (<<? MAGIC :4 /unsigned ,
3022- ? VERSION :4 /unsigned ,
3023- ChType :8 /unsigned ,
3024- NumEntries :16 /unsigned ,
3025- NumRecords :32 /unsigned ,
3026- Timestamp :64 /signed ,
3027- Epoch :64 /unsigned ,
3028- NextChId0 :64 /unsigned ,
3029- Crc :32 /integer ,
3030- DataSize :32 /unsigned ,
3031- TrailerSize :32 /unsigned ,
3032- FilterSize :8 /unsigned ,
3033- _Reserved :24 ,
3034- MaybeFilterAndRest /binary >> = HeaderData0 ,
3035- #? MODULE {mode = # read {type = RType ,
3036- chunk_selector = Selector ,
3037- next_offset = NextChId0 } = Read0 } = State ) ->
2984+
2985+ parse_header (<<? MAGIC :4 /unsigned ,
2986+ ? VERSION :4 /unsigned ,
2987+ ChType :8 /unsigned ,
2988+ NumEntries :16 /unsigned ,
2989+ NumRecords :32 /unsigned ,
2990+ Timestamp :64 /signed ,
2991+ Epoch :64 /unsigned ,
2992+ NextChId0 :64 /unsigned ,
2993+ Crc :32 /integer ,
2994+ DataSize :32 /unsigned ,
2995+ TrailerSize :32 /unsigned ,
2996+ FilterSize :8 /unsigned ,
2997+ _Reserved :24 ,
2998+ MaybeFilterAndRest /binary >> = HeaderData0 ,
2999+ #? MODULE {mode = # read {type = RType ,
3000+ chunk_selector = Selector ,
3001+ next_offset = NextChId0 } = Read0 } = State ) ->
30383002 {ToSkip , ToSend } = select_amount_to_send (RType , Selector , ChType ,
3039- FilterSize , DataSize ,
3040- TrailerSize ),
3041- case byte_size (MaybeFilterAndRest ) of
3042- RAS when RAS >= ( ToSkip + ToSend ) ->
3003+ FilterSize , DataSize ,
3004+ TrailerSize ),
3005+ case byte_size (MaybeFilterAndRest ) >= ( ToSkip + ToSend ) of
3006+ true ->
30433007 % % we read everything we needed
30443008 {ReadAheadData , Content } =
30453009 case MaybeFilterAndRest of
30463010 <<_Skip :ToSkip /binary ,
30473011 Ctnt :ToSend /binary ,
30483012 Rest /binary >>
3049- when byte_size (Rest ) > ? HEADER_SIZE_B + ? DEFAULT_FILTER_SIZE ->
3013+ when byte_size (Rest ) > ? HEADER_SIZE_B + ? DEFAULT_FILTER_SIZE ->
3014+ % % remained is larger than 64 bytes so worth keeping
3015+ % % around
30503016 {Rest , Ctnt };
30513017 <<_Skip :ToSkip /binary ,
30523018 Ctnt :ToSend /binary ,
@@ -3059,20 +3025,27 @@ read_ahead_chunk(<<?MAGIC:4/unsigned,
30593025 ChType , NumEntries , NumRecords , Timestamp ,
30603026 Epoch , NextChId0 , Crc , DataSize , TrailerSize ,
30613027 FilterSize );
3062- _ ->
3063- need_more_data
3028+ false ->
3029+ % % having to throw away the read ahead data here
3030+ Read1 = Read0 # read {read_ahead_data = undefined },
3031+ maybe_return_header (State #? MODULE {mode = Read1 },
3032+ HeaderData0 , MaybeFilterAndRest , undefined ,
3033+ ChType , NumEntries , NumRecords , Timestamp ,
3034+ Epoch , NextChId0 , Crc , DataSize , TrailerSize ,
3035+ FilterSize )
30643036 end ;
3065- read_ahead_chunk (_ , _ ) ->
3037+ parse_header (_ , _ ) ->
30663038 need_more_data .
30673039
3068- read_ahead_offset (LastDataSize ) ->
3069- case LastDataSize of
3070- LastDataSize when ? CAN_READ_AHEAD (LastDataSize ) ->
3040+ read_ahead_size (LastDataSize ) ->
3041+ case LastDataSize =/= undefined andalso
3042+ LastDataSize =< ? READ_AHEAD_LIMIT of
3043+ true ->
30713044 % % the previous chunk was small, try to read
30723045 % % the next chunk fully in one read
3073- % % this can save us the sendfile call later
3046+ % % this can save us a system call later
30743047 ? DEFAULT_FILTER_SIZE + ? READ_AHEAD_LIMIT ;
3075- _ ->
3048+ false ->
30763049 % % optimistically read the default filter size.
30773050 % % this amounts to 64 bytes with the header (small binary)
30783051 % % and it may save us a syscall reading the filter
@@ -3087,7 +3060,6 @@ maybe_return_header(#?MODULE{cfg = #cfg{counter = CntRef},
30873060 fd = Fd } = State , HeaderData0 , MaybeFilter , Content ,
30883061 ChType , NumEntries , NumRecords , Timestamp , Epoch ,
30893062 NextChId0 , Crc , DataSize , TrailerSize , FilterSize ) ->
3090- <<HeaderData :? HEADER_SIZE_B /binary , _ /binary >> = HeaderData0 ,
30913063
30923064 ChunkFilter = case MaybeFilter of
30933065 <<F :FilterSize /binary , _ /binary >> ->
@@ -3112,6 +3084,7 @@ maybe_return_header(#?MODULE{cfg = #cfg{counter = CntRef},
31123084
31133085 case osiris_bloom :is_match (ChunkFilter , Filter ) of
31143086 true ->
3087+ <<HeaderData :? HEADER_SIZE_B /binary , _ /binary >> = HeaderData0 ,
31153088 {ok , #{chunk_id => NextChId0 ,
31163089 epoch => Epoch ,
31173090 type => ChType ,
0 commit comments