Skip to content

Commit

Permalink
Handover merge actions in batches
Browse files Browse the repository at this point in the history
This is done to limit the size of terms sent over the distributed erlang
connection. Not doing so might result in connection loss due to inter-
node heartbeats timing out.
  • Loading branch information
schlagert committed Jan 8, 2016
1 parent d6748a6 commit cf6752d
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 17 deletions.
8 changes: 4 additions & 4 deletions include/lbm_kv.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
-define(LBM_KV_COOKIE, {{0,0,0}, lbm_kv}).

%% The options used in `mnesia:create_table/2'.
-define(LBM_KV_TABLE_OPTS(), [{record_name, lbm_kv},
{attributes, record_info(fields, lbm_kv)},
{cookie, ?LBM_KV_COOKIE},
{ram_copies, [node() | nodes()]}]).
-define(LBM_KV_TABLE_OPTS, [{record_name, lbm_kv},
{attributes, record_info(fields, lbm_kv)},
{cookie, ?LBM_KV_COOKIE},
{ram_copies, [node() | nodes()]}]).

%% Default timeout for RPC calls.
-define(LBM_KV_RPC_TIMEOUT, 2000).
Expand Down
2 changes: 1 addition & 1 deletion src/lbm_kv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@
%%------------------------------------------------------------------------------
-spec create(table()) -> ok | {error, term()}.
create(Table) ->
case mnesia:create_table(Table, ?LBM_KV_TABLE_OPTS()) of
case mnesia:create_table(Table, ?LBM_KV_TABLE_OPTS) of
{atomic, ok} ->
await_table(Table);
{aborted, {already_exists, Table}} ->
Expand Down
41 changes: 32 additions & 9 deletions src/lbm_kv_merge.erl
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ merge_table(Remote, Table) ->
Keys = get_all_keys([node(), Remote], Table),
case merge_entries(Keys, node(), Remote, Table, {[], []}) of
{ok, {LocalActions, RemoteActions}} ->
case rpc_actions(Remote, RemoteActions) of
ok -> rpc_actions(node(), LocalActions);
case rpc_merge(Remote, RemoteActions) of
ok -> handle_actions(LocalActions);
Error -> Error
end;
Error ->
Expand Down Expand Up @@ -195,7 +195,7 @@ user_callback(Table, Key, LRecord, RRecord) when is_atom(Table) ->
noop
catch Class:Exception ->
error_logger:error_msg(
"~w:handle_conflict/3 raised ~w on key ~w: ~p",
"~w:handle_conflict/3 raised ~w on key ~w: ~w",
[Table, Class, Key, Exception]),
{error, {diverged, Table, Key}}
end;
Expand Down Expand Up @@ -233,17 +233,40 @@ get_all_keys(Nodes, Table) ->
%% when a call is local and optimizes that.
%%------------------------------------------------------------------------------
rpc_mnesia(Node, Function, Args) ->
check_rpc(rpc:call(Node, mnesia, Function, Args, ?LBM_KV_RPC_TIMEOUT)).
Timeout = application:get_env(lbm_kv, rpc_timeout, ?LBM_KV_RPC_TIMEOUT),
check_rpc(rpc:call(Node, mnesia, Function, Args, Timeout)).

%%------------------------------------------------------------------------------
%% @private
%% Make an RPC call to this module on `Node' handing over merge actions.
%% Make subsequent RPC calls to this module on `Node' handing over merge
%% actions in batches of a configurable size. This is done to limit the size
%% of terms sent over the distributed erlang connection. Not doing so might
%% result in connection loss due to inter-node heartbeats timing out.
%%------------------------------------------------------------------------------
rpc_actions(_Node, []) ->
rpc_merge(Node, Actions) ->
BatchSize = application:get_env(lbm_kv, batch_size, 10),
Timeout = application:get_env(lbm_kv, rpc_timeout, ?LBM_KV_RPC_TIMEOUT),
rpc_merge(Node, Actions, BatchSize, Timeout + BatchSize * 200).
rpc_merge(_Node, [], _BatchSize, _Timeout) ->
ok;
rpc_actions(Node, Actions) ->
Timeout = ?LBM_KV_RPC_TIMEOUT + length(Actions) * 100,
check_rpc(rpc:call(Node, ?MODULE, handle_actions, [Actions], Timeout)).
rpc_merge(Node, Actions, BatchSize, Timeout) ->
{Current, Remaining} = split(BatchSize, Actions),
case rpc:call(Node, ?MODULE, handle_actions, [Current], Timeout) of
ok -> rpc_merge(Node, Remaining, BatchSize, Timeout);
Result -> check_rpc(Result)
end.

%%------------------------------------------------------------------------------
%% @private
%% Similar to {@link lists:split/2}. However, it is not an error when `N'
%% exceeds the length of the list.
%%------------------------------------------------------------------------------
split(N, List) ->
try
lists:split(N, List)
catch
error:badarg -> {List, []}
end.

%%------------------------------------------------------------------------------
%% @private
Expand Down
8 changes: 5 additions & 3 deletions src/lbm_kv_mon.erl
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ connect_nodes(Node, LocalTables, LocalOnlyTables, RemoteOnlyTables) ->

?INFO("Successfully connected to ~s~n", [Node]);
Error ->
?ERR("Failed to connect to ~s: ~p~n", [Node, Error]),
?ERR("Failed to connect to ~s: ~w~n", [Node, Error]),

%% last resort
handle_unresolved_conflict()
Expand Down Expand Up @@ -302,7 +302,8 @@ reduce(Node) ->
{atomic, ok} ->
?INFO("Successfully disconnected from ~s~n", [Node]);
Error = {aborted, _} ->
?ERR("Failed to remove schema from ~s: ~p~n", [Node, Error])
?LBM_KV_DBG("Failed to remove schema from ~s: ~w~n",
[Node, Error])
end;
false ->
%% The disconnected node is not part of the seen `db_nodes' anymore,
Expand Down Expand Up @@ -378,7 +379,8 @@ get_db_nodes(Node) -> rpc_mnesia(Node, system_info, [db_nodes]).
%% when a call is local and optimizes that.
%%------------------------------------------------------------------------------
rpc_mnesia(Node, Function, Args) ->
case rpc:call(Node, mnesia, Function, Args, ?LBM_KV_RPC_TIMEOUT) of
Timeout = application:get_env(lbm_kv, rpc_timeout, ?LBM_KV_RPC_TIMEOUT),
case rpc:call(Node, mnesia, Function, Args, Timeout) of
{badrpc, Reason} -> {error, Reason};
Result -> Result
end.
Expand Down

0 comments on commit cf6752d

Please sign in to comment.