108108
109109-define (SKIP_SEARCH_JUMP , 2048 ).
110110-define (READ_AHEAD_LIMIT , 4096 ).
111- -define (CAN_READ_AHEAD (LastDataSize ), LastDataSize =/= undefined andalso
112- LastDataSize =< ? READ_AHEAD_LIMIT ).
113111
114112% % Specification of the Log format.
115113% %
@@ -2895,66 +2893,24 @@ read_header0(State) ->
28952893 % % reads the next header if permitted
28962894 case can_read_next (State ) of
28972895 true ->
2898- do_read_header (State );
2896+ read_header_with_ra (State );
28992897 false ->
29002898 {end_of_stream , State }
29012899 end .
29022900
2903- do_read_header (#? MODULE {cfg = # cfg {directory = Dir ,
2904- shared = Shared },
2905- mode = # read {next_offset = NextChId0 ,
2906- position = Pos ,
2907- last_data_size = Lds ,
2908- read_ahead_data = undefined } = Read0 ,
2909- current_file = CurFile ,
2910- fd = Fd } = State ) ->
2911- ReadAheadOffset = read_ahead_offset (Lds ),
2912-
2913- case file :pread (Fd , Pos , ? HEADER_SIZE_B + ReadAheadOffset ) of
2914- {ok , <<? MAGIC :4 /unsigned ,
2915- ? VERSION :4 /unsigned ,
2916- ChType :8 /unsigned ,
2917- NumEntries :16 /unsigned ,
2918- NumRecords :32 /unsigned ,
2919- Timestamp :64 /signed ,
2920- Epoch :64 /unsigned ,
2921- NextChId0 :64 /unsigned ,
2922- Crc :32 /integer ,
2923- DataSize :32 /unsigned ,
2924- TrailerSize :32 /unsigned ,
2925- FilterSize :8 /unsigned ,
2926- _Reserved :24 ,
2927- MaybeFilterAndRest /binary >> = HeaderData0 }
2928- when ? CAN_READ_AHEAD (Lds ) ->
2929- case read_ahead_chunk (HeaderData0 , State ) of
2930- need_more_data ->
2931- Read1 = Read0 # read {read_ahead_data = undefined },
2932- maybe_return_header (State #? MODULE {mode = Read1 },
2933- HeaderData0 , MaybeFilterAndRest , undefined ,
2934- ChType , NumEntries , NumRecords , Timestamp ,
2935- Epoch , NextChId0 , Crc , DataSize , TrailerSize ,
2936- FilterSize );
2937- R ->
2938- R
2939- end ;
2940- {ok , <<? MAGIC :4 /unsigned ,
2941- ? VERSION :4 /unsigned ,
2942- ChType :8 /unsigned ,
2943- NumEntries :16 /unsigned ,
2944- NumRecords :32 /unsigned ,
2945- Timestamp :64 /signed ,
2946- Epoch :64 /unsigned ,
2947- NextChId0 :64 /unsigned ,
2948- Crc :32 /integer ,
2949- DataSize :32 /unsigned ,
2950- TrailerSize :32 /unsigned ,
2951- FilterSize :8 /unsigned ,
2952- _Reserved :24 ,
2953- MaybeFilter /binary >> = HeaderData0 } ->
2954- maybe_return_header (State , HeaderData0 , MaybeFilter , undefined ,
2955- ChType , NumEntries , NumRecords , Timestamp ,
2956- Epoch , NextChId0 , Crc , DataSize ,
2957- TrailerSize , FilterSize );
2901+ read_header_with_ra (#? MODULE {cfg = # cfg {directory = Dir ,
2902+ shared = Shared },
2903+ mode = # read {next_offset = NextChId0 ,
2904+ position = Pos ,
2905+ last_data_size = Lds ,
2906+ read_ahead_data = undefined } = Read0 ,
2907+ current_file = CurFile ,
2908+ fd = Fd } = State ) ->
2909+ ReadAheadSize = read_ahead_size (Lds ),
2910+
2911+ case file :pread (Fd , Pos , ? HEADER_SIZE_B + ReadAheadSize ) of
2912+ {ok , Bin } when byte_size (Bin ) >= ? HEADER_SIZE_B ->
2913+ parse_header (Bin , State );
29582914 {ok , Bin } when byte_size (Bin ) < ? HEADER_SIZE_B ->
29592915 % % partial header read
29602916 % % this can happen when a replica reader reads ahead
@@ -3008,45 +2964,47 @@ do_read_header(#?MODULE{cfg = #cfg{directory = Dir,
30082964 Invalid ->
30092965 {error , {invalid_chunk_header , Invalid }}
30102966 end ;
3011- do_read_header (#? MODULE {mode = # read {read_ahead_data = RAD } = Read0 } = State ) ->
3012- case read_ahead_chunk (RAD , State ) of
2967+ read_header_with_ra (#? MODULE {mode = # read {read_ahead_data = RAD } = Read0 } = State ) ->
2968+ case parse_header (RAD , State ) of
30132969 need_more_data ->
30142970 % % we don't have enough data in memory
30152971 Read1 = Read0 # read {read_ahead_data = undefined },
3016- do_read_header (State #? MODULE {mode = Read1 });
3017- R ->
3018- R
2972+ read_header_with_ra (State #? MODULE {mode = Read1 });
2973+ Result ->
2974+ Result
30192975 end .
30202976
3021- read_ahead_chunk (<<? MAGIC :4 /unsigned ,
3022- ? VERSION :4 /unsigned ,
3023- ChType :8 /unsigned ,
3024- NumEntries :16 /unsigned ,
3025- NumRecords :32 /unsigned ,
3026- Timestamp :64 /signed ,
3027- Epoch :64 /unsigned ,
3028- NextChId0 :64 /unsigned ,
3029- Crc :32 /integer ,
3030- DataSize :32 /unsigned ,
3031- TrailerSize :32 /unsigned ,
3032- FilterSize :8 /unsigned ,
3033- _Reserved :24 ,
3034- MaybeFilterAndRest /binary >> = HeaderData0 ,
3035- #? MODULE {mode = # read {type = RType ,
3036- chunk_selector = Selector ,
3037- next_offset = NextChId0 } = Read0 } = State ) ->
2977+ parse_header (<<? MAGIC :4 /unsigned ,
2978+ ? VERSION :4 /unsigned ,
2979+ ChType :8 /unsigned ,
2980+ NumEntries :16 /unsigned ,
2981+ NumRecords :32 /unsigned ,
2982+ Timestamp :64 /signed ,
2983+ Epoch :64 /unsigned ,
2984+ NextChId0 :64 /unsigned ,
2985+ Crc :32 /integer ,
2986+ DataSize :32 /unsigned ,
2987+ TrailerSize :32 /unsigned ,
2988+ FilterSize :8 /unsigned ,
2989+ _Reserved :24 ,
2990+ MaybeFilterAndRest /binary >> = HeaderData0 ,
2991+ #? MODULE {mode = # read {type = RType ,
2992+ chunk_selector = Selector ,
2993+ next_offset = NextChId0 } = Read0 } = State ) ->
30382994 {ToSkip , ToSend } = select_amount_to_send (RType , Selector , ChType ,
3039- FilterSize , DataSize ,
3040- TrailerSize ),
3041- case byte_size (MaybeFilterAndRest ) of
3042- RAS when RAS >= ( ToSkip + ToSend ) ->
2995+ FilterSize , DataSize ,
2996+ TrailerSize ),
2997+ case byte_size (MaybeFilterAndRest ) >= ( ToSkip + ToSend ) of
2998+ true ->
30432999 % % we read everything we needed
30443000 {ReadAheadData , Content } =
30453001 case MaybeFilterAndRest of
30463002 <<_Skip :ToSkip /binary ,
30473003 Ctnt :ToSend /binary ,
30483004 Rest /binary >>
3049- when byte_size (Rest ) > ? HEADER_SIZE_B + ? DEFAULT_FILTER_SIZE ->
3005+ when byte_size (Rest ) > ? HEADER_SIZE_B + ? DEFAULT_FILTER_SIZE ->
3006+ % % remained is larger than 64 bytes so worth keeping
3007+ % % around
30503008 {Rest , Ctnt };
30513009 <<_Skip :ToSkip /binary ,
30523010 Ctnt :ToSend /binary ,
@@ -3059,18 +3017,25 @@ read_ahead_chunk(<<?MAGIC:4/unsigned,
30593017 ChType , NumEntries , NumRecords , Timestamp ,
30603018 Epoch , NextChId0 , Crc , DataSize , TrailerSize ,
30613019 FilterSize );
3062- _ ->
3063- need_more_data
3020+ false ->
3021+ % % having to throw away the read ahead data here
3022+ Read1 = Read0 # read {read_ahead_data = undefined },
3023+ maybe_return_header (State #? MODULE {mode = Read1 },
3024+ HeaderData0 , MaybeFilterAndRest , undefined ,
3025+ ChType , NumEntries , NumRecords , Timestamp ,
3026+ Epoch , NextChId0 , Crc , DataSize , TrailerSize ,
3027+ FilterSize )
30643028 end ;
3065- read_ahead_chunk (_ , _ ) ->
3029+ parse_header (_ , _ ) ->
30663030 need_more_data .
30673031
3068- read_ahead_offset (LastDataSize ) ->
3069- case LastDataSize of
3070- LastDataSize when ? CAN_READ_AHEAD (LastDataSize ) ->
3032+ read_ahead_size (LastDataSize ) ->
3033+ case LastDataSize =/= undefined andalso
3034+ LastDataSize =< ? READ_AHEAD_LIMIT of
3035+ true ->
30713036 % % the previous chunk was small, try to read
30723037 % % the next chunk fully in one read
3073- % % this can save us the sendfile call later
3038+ % % this can save us a system call later
30743039 ? DEFAULT_FILTER_SIZE + ? READ_AHEAD_LIMIT ;
30753040 _ ->
30763041 % % optimistically read the default filter size.
0 commit comments