Skip to content

Commit

Permalink
Optimize parser by using nested parser state
Browse files Browse the repository at this point in the history
When parsing a nested array in chunks efficiently, the parser state
(continuation data) needs to be a nested structure. The parser is
restructured.

Fixes wooga#127
  • Loading branch information
zuiderkwast committed Jun 22, 2021
1 parent 40fb5bc commit 85d0824
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 376 deletions.
13 changes: 9 additions & 4 deletions include/eredis.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@
%% Continuation data is whatever data returned by any of the parse
%% functions. This is used to continue where we left off the next time
%% the user calls parse/2.
-type continuation_data() :: any().
-type parser_state() :: status_continue | bulk_continue | multibulk_continue | error_continue.
-type continuation_data() ::
start |
{status_continue, Acc :: binary()} |
{error_continue, Acc :: binary()} |
{bulk_size, Acc :: binary()} |
{multibulk_size, Acc :: binary()} |
{bulk_continue, BytesLeft :: integer(), Acc :: binary()} |
{multibulk_continue, NumLeft :: integer(), Acc :: list()}.

%% Internal types
-ifdef(OTP_RELEASE). % OTP >= 21
Expand All @@ -42,8 +48,7 @@
%% Internal parser state. Is returned from parse/2 and must be
%% included on the next calls to parse/2.
-record(pstate, {
state = undefined :: parser_state() | undefined,
continuation_data :: continuation_data() | undefined
states = [] :: [continuation_data()]
}).

-define(NL, "\r\n").
Expand Down
327 changes: 102 additions & 225 deletions src/eredis_parser.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
-export([init/0, parse/2]).

%% Exported for testing
-export([parse_bulk/1, parse_bulk/2,
parse_multibulk/1, parse_multibulk/2, buffer_create/0, buffer_create/1]).
%% -export([parse_bulk/1, parse_bulk/2,
%% parse_multibulk/1, parse_multibulk/2, buffer_create/0, buffer_create/1]).

%%
%% API
Expand Down Expand Up @@ -66,247 +66,124 @@ init() ->

%% Parser in initial state, the data we receive will be the beginning
%% of a response
parse(#pstate{state = undefined} = State, NewData) ->
parse(#pstate{states = []}, NewData) ->
return(do_parse(start, NewData), []);
parse(#pstate{states = [State | States]}, NewData) ->
return(do_parse(State, NewData), States).

%% Combines the result of do_parse/2 with the nested states of parse/2.
return({Tag, Value, <<>>}, []) when Tag =:= ok; Tag =:= error ->
{Tag, Value, #pstate{}};
return({Tag, Value, RestData}, []) when Tag =:= ok; Tag =:= error ->
{Tag, Value, RestData, #pstate{}};
return({Tag, Value, RestData}, [{multibulk_continue, NumLeft, Acc} | States])
when Tag =:= ok; Tag =:= error ->
NewStates = [{multibulk_continue, NumLeft - 1, [Value | Acc]} | States],
parse(#pstate{states = NewStates}, RestData);
return({continue, Continue}, States) ->
{continue, #pstate{states = [Continue | States]}};
return({nested, State, Data}, States) ->
%% We're in a multibulk and need to parse a new element
parse(#pstate{states = [start, State | States]}, Data);
return({error, _Reason} = ParseError, _States) ->
ParseError.

%% Parses a value. State is not nested here.
-spec do_parse(continuation_data(), NewData :: binary()) ->
{ok, Value :: any(), RestData :: binary()} |
{error, Message :: binary(), RestData :: binary()} |
{continue, continuation_data()} |
{nested, continuation_data(), RestData :: binary()} |
{error, unknown_response}.
do_parse(start, <<Type, Data/binary>>) ->
%% Look at the first byte to get the type of reply
case NewData of
%% Status
<<$+, Data/binary>> ->
return_result(parse_simple(Data), State, status_continue);
case Type of
%% Status (AKA simple string)
$+ ->
do_parse({status_continue, <<>>}, Data);

%% Error
<<$-, Data/binary>> ->
return_error(parse_simple(Data), State, error_continue);
$- ->
do_parse({error_continue, <<>>}, Data);

%% Integer reply
<<$:, Data/binary>> ->
return_result(parse_simple(Data), State, status_continue);
%% Integer reply (returned as binary)
$: ->
do_parse({status_continue, <<>>}, Data);

%% Multibulk
<<$*, _Rest/binary>> ->
return_result(parse_multibulk(NewData), State, multibulk_continue);
%% Multibulk (array)
$* ->
do_parse({multibulk_size, <<>>}, Data);

%% Bulk
<<$$, _Rest/binary>> ->
return_result(parse_bulk(NewData), State, bulk_continue);
%% Bulk (string)
$$ ->
do_parse({bulk_size, <<>>}, Data);

_ ->
%% TODO: Handle the case where we start parsing a new
%% response, but cannot make any sense of it
{error, unknown_response}
end;

%% The following clauses all match on different continuation states

parse(#pstate{state = bulk_continue,
continuation_data = ContinuationData} = State, NewData) ->
return_result(parse_bulk(ContinuationData, NewData), State, bulk_continue);

parse(#pstate{state = multibulk_continue,
continuation_data = ContinuationData} = State, NewData) ->
return_result(parse_multibulk(ContinuationData, NewData), State, multibulk_continue);

parse(#pstate{state = status_continue,
continuation_data = ContinuationData} = State, NewData) ->
return_result(parse_simple(ContinuationData, NewData), State, status_continue);

parse(#pstate{state = error_continue,
continuation_data = ContinuationData} = State, NewData) ->
return_error(parse_simple(ContinuationData, NewData), State, error_continue).

%%
%% MULTIBULK
%%

parse_multibulk(Data) when is_binary(Data) -> parse_multibulk(buffer_create(Data));

parse_multibulk(Buffer) ->
do_parse({StateTag, Acc}, Data) when StateTag =:= status_continue;
StateTag =:= error_continue ->
Buffer = <<Acc/binary, Data/binary>>,
case get_newline_pos(Buffer) of
undefined ->
{continue, {incomplete_size, Buffer}};
{continue, {StateTag, Buffer}};
NewlinePos ->
OffsetNewlinePos = NewlinePos - 1,
<<$*, Size:OffsetNewlinePos/binary, ?NL, Bulk/binary>> = buffer_to_binary(Buffer),
IntSize = list_to_integer(binary_to_list(Size)),

do_parse_multibulk(IntSize, buffer_create(Bulk))
end.

%% Size of multibulk was incomplete, try again
parse_multibulk({incomplete_size, Buffer}, NewData0) ->
NewBuffer = buffer_append(Buffer, NewData0),
parse_multibulk(NewBuffer);

%% Ran out of data inside do_parse_multibulk in parse_bulk, must
%% continue traversing the bulks
parse_multibulk({in_parsing_bulks, Count, Buffer, Acc},
NewData0) ->
NewBuffer = buffer_append(Buffer, NewData0),

%% Continue where we left off
do_parse_multibulk(Count, NewBuffer, Acc).

%% @doc: Parses the given number of bulks from Data. If Data does not
%% contain enough bulks, {continue, ContinuationData} is returned with
%% enough information to start parsing with the correct count and
%% accumulated data.
do_parse_multibulk(Count, Buffer) ->
do_parse_multibulk(Count, Buffer, []).

do_parse_multibulk(-1, Buffer, []) ->
{ok, undefined, buffer_to_binary(Buffer)};
do_parse_multibulk(0, Buffer, Acc) ->
{ok, lists:reverse(Acc), buffer_to_binary(Buffer)};
do_parse_multibulk(Count, Buffer, Acc) ->
case buffer_size(Buffer) == 0 of
true -> {continue, {in_parsing_bulks, Count, buffer_create(), Acc}};
false ->
%% Try parsing the first bulk in Data, if it works, we get the
%% extra data back that was not part of the bulk which we can
%% recurse on. If the bulk does not contain enough data, we
%% return with a continuation and enough data to pick up where we
%% left off. In the continuation we will get more data
%% automagically in Data, so parsing the bulk might work.
case parse_bulk(Buffer) of
{ok, Value, Rest} ->
do_parse_multibulk(Count - 1, buffer_create(Rest), [Value | Acc]);
{continue, _} ->
{continue, {in_parsing_bulks, Count, Buffer, Acc}}
end
end.

%%
%% BULK
%%

parse_bulk(Data) when is_binary(Data) -> parse_bulk(buffer_create(Data));

parse_bulk(Buffer) ->
case buffer_hd(Buffer) of
[$*] -> parse_multibulk(Buffer);
[$+] -> parse_simple(buffer_tl(Buffer));
[$-] -> parse_simple(buffer_tl(Buffer));
[$:] -> parse_simple(buffer_tl(Buffer));
[$$] -> do_parse_bulk(Buffer)
end.

%% Bulk, at beginning of response
do_parse_bulk(Buffer) ->
<<Value:NewlinePos/binary, ?NL, RestData/binary>> = Buffer,
Tag = case StateTag of
status_continue -> ok;
error_continue -> error
end,
{Tag, Value, RestData}
end;
do_parse({StateTag, Acc}, Data) when StateTag =:= bulk_size;
StateTag =:= multibulk_size ->
%% Find the position of the first terminator, everything up until
%% this point contains the size specifier. If we cannot find it,
%% we received a partial response and need more data
Buffer = <<Acc/binary, Data/binary>>,
case get_newline_pos(Buffer) of
undefined ->
{continue, {incomplete_size, Buffer}};
NewlinePos ->
OffsetNewlinePos = NewlinePos - 1, % Take into account the first $
<<$$, Size:OffsetNewlinePos/binary, Bulk/binary>> = buffer_to_binary(Buffer),
IntSize = list_to_integer(binary_to_list(Size)),

if
%% Nil response from redis
IntSize =:= -1 ->
<<?NL, Rest/binary>> = Bulk,
{ok, undefined, Rest};
%% We have enough data for the entire bulk
size(Bulk) - (size(<<?NL>>) * 2) >= IntSize ->
<<?NL, Value:IntSize/binary, ?NL, Rest/binary>> = Bulk,
{ok, Value, Rest};
true ->
%% Need more data, so we send the bulk without the
%% size specifier to our future self
{continue, {IntSize, buffer_create(Bulk)}}
end
end.

%% Bulk, continuation from partial bulk size
parse_bulk({incomplete_size, Buffer}, NewData0) ->
NewBuffer = buffer_append(Buffer, NewData0),
parse_bulk(NewBuffer);

%% Bulk, continuation from partial bulk value
parse_bulk({IntSize, Buffer0}, Data) ->
Buffer = buffer_append(Buffer0, Data),

case buffer_size(Buffer) - (size(<<?NL>>) * 2) >= IntSize of
true ->
<<?NL, Value:IntSize/binary, ?NL, Rest/binary>> = buffer_to_binary(Buffer),
{ok, Value, Rest};
false ->
{continue, {IntSize, Buffer}}
end.


%%
%% SIMPLE REPLIES
%%
%% Handles replies on the following format:
%% TData\r\n
%% Where T is a type byte, like '+', '-', ':'. Data is terminated by \r\n

%% @doc: Parse simple replies. Data must not contain type
%% identifier. Type must be handled by the caller.
parse_simple(Data) when is_binary(Data) -> parse_simple(buffer_create(Data));

parse_simple(Buffer) ->
case get_newline_pos(Buffer) of
undefined ->
{continue, {incomplete_simple, Buffer}};
%% Incomplete size
{continue, {StateTag, Buffer}};
NewlinePos ->
<<Value:NewlinePos/binary, ?NL, Rest/binary>> = buffer_to_binary(Buffer),
{ok, Value, Rest}
end.

parse_simple({incomplete_simple, Buffer}, NewData0) ->
NewBuffer = buffer_append(Buffer, NewData0),
parse_simple(NewBuffer).

%%
%% INTERNAL HELPERS
%%
get_newline_pos({B, _}) ->
case re:run(B, ?NL) of
{match, [{Pos, _}]} -> Pos;
<<Size:NewlinePos/binary, ?NL, RestData/binary>> = Buffer,
IntSize = binary_to_integer(Size),
NextState = case StateTag of
bulk_size -> {bulk_continue, IntSize, <<>>};
multibulk_size -> {multibulk_continue, IntSize, []}
end,
do_parse(NextState, RestData)
end;
do_parse({bulk_continue, -1, <<>>}, Data) ->
%% Nil (AKA null) string
{ok, undefined, Data};
do_parse({bulk_continue, -1, Acc}, <<"\n", RestData/binary>>) when byte_size(Acc) > 0 ->
%% It's only half of the "\r\n" we're waiting for (unlikely case)
BulkSize = byte_size(Acc) - 1,
<<Bulk:BulkSize/binary, "\r">> = Acc,
{ok, Bulk, RestData};
do_parse({bulk_continue, RemainingSize, Acc}, Data)
when byte_size(Data) >= RemainingSize + length(?NL) ->
%% We have enough data for the entire bulk
<<RemainingBulk:RemainingSize/binary, ?NL, RestData/binary>> = Data,
Bulk = <<Acc/binary, RemainingBulk/binary>>,
{ok, Bulk, RestData};
do_parse({bulk_continue, RemainingSize, Acc}, Data) ->
NewRemainingSize = RemainingSize - byte_size(Data),
NewAcc = <<Acc/binary, Data/binary>>,
{continue, {bulk_continue, NewRemainingSize, NewAcc}};
do_parse({multibulk_continue, -1, []}, Data) ->
%% Nil (AKA null) array
{ok, undefined, Data};
do_parse({multibulk_continue, 0, Acc}, Data) ->
{ok, lists:reverse(Acc), Data};
do_parse({multibulk_continue, _RemainingItems, _Acc} = State, <<>>) ->
{continue, State};
do_parse({multibulk_continue, _RemainingItems, _Acc} = State, Data) ->
{nested, State, Data}.

get_newline_pos(B) ->
case binary:match(B, <<?NL>>) of
{Pos, _Len} -> Pos;
nomatch -> undefined
end.

buffer_create() ->
{[], 0}.

buffer_create(Data) ->
{[Data], byte_size(Data)}.

buffer_append({List, Size}, Binary) ->
NewList = case List of
[] -> [Binary];
[Head | Tail] -> [Head, Tail, Binary]
end,
{NewList, Size + byte_size(Binary)}.

buffer_hd({[<<Char, _/binary>> | _], _}) -> [Char];
buffer_hd({[], _}) -> [].

buffer_tl({[<<_, RestBin/binary>> | Rest], Size}) -> {[RestBin | Rest], Size - 1}.

buffer_to_binary({List, _}) -> iolist_to_binary(List).

buffer_size({_, Size}) -> Size.

%% @doc: Helper for handling the result of parsing. Will update the
%% parser state with the continuation of given name if necessary.
return_result({ok, Value, <<>>}, _State, _StateName) ->
{ok, Value, init()};
return_result({ok, Value, Rest}, _State, _StateName) ->
{ok, Value, Rest, init()};
return_result({continue, ContinuationData}, State, StateName) ->
{continue, State#pstate{state = StateName, continuation_data = ContinuationData}}.

%% @doc: Helper for returning an error. Uses return_result/3 and just transforms the {ok, ...} tuple into an error tuple
return_error(Result, State, StateName) ->
case return_result(Result, State, StateName) of
{ok, Value, ParserState} ->
{error, Value, ParserState};
{ok, Value, Rest, ParserState} ->
{error, Value, Rest, ParserState};
Res ->
Res
end.
Loading

0 comments on commit 85d0824

Please sign in to comment.