Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions src/couch/src/couch_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ close_db_if_idle(DbName) ->


init([]) ->
couch_util:set_mqd_off_heap(),
couch_util:set_mqd_off_heap(?MODULE),

% Mark pluggable storage engines as a supported feature
config:enable_feature('pluggable-storage-engines'),
Expand Down Expand Up @@ -244,7 +244,11 @@ terminate(Reason, Srv) ->
[Reason,
Srv#server{lru = redacted}]),
ets:foldl(fun(#entry{db = Db}, _) ->
couch_util:shutdown_sync(couch_db:get_pid(Db))
% Filter out any entry records for open_async
% processes that haven't finished.
if Db == undefined -> ok; true ->
couch_util:shutdown_sync(couch_db:get_pid(Db))
end
end, nil, couch_dbs),
ok.

Expand Down Expand Up @@ -386,8 +390,8 @@ handle_call(reload_engines, _From, Server) ->
{reply, ok, Server#server{engines = get_configured_engines()}};
handle_call(get_server, _From, Server) ->
{reply, {ok, Server}, Server};
handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) ->
true = ets:delete(couch_dbs_pid_to_name, FromPid),
handle_call({open_result, T0, DbName, {ok, Db}}, {Opener, _}, Server) ->
true = ets:delete(couch_dbs_pid_to_name, Opener),
OpenTime = timer:now_diff(os:timestamp(), T0) / 1000,
couch_stats:update_histogram([couchdb, db_open_time], OpenTime),
DbPid = couch_db:get_pid(Db),
Expand All @@ -396,7 +400,7 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) ->
% db was deleted during async open
exit(DbPid, kill),
{reply, ok, Server};
[#entry{req_type = ReqType, waiters = Waiters} = Entry] ->
[#entry{pid = Opener, req_type = ReqType, waiters = Waiters} = Entry] ->
link(DbPid),
[gen_server:reply(Waiter, {ok, Db}) || Waiter <- Waiters],
% Cancel the creation request if it exists.
Expand All @@ -421,27 +425,37 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) ->
true ->
Server#server.lru
end,
{reply, ok, Server#server{lru = Lru}}
{reply, ok, Server#server{lru = Lru}};
[#entry{}] ->
% A mismatched opener pid means that this open_result message
% was in our mailbox but is now stale. Mostly ignore
% it except to ensure that the db pid is super dead.
exit(couch_db:get_pid(Db), kill),
{reply, ok, Server}
end;
handle_call({open_result, T0, DbName, {error, eexist}}, From, Server) ->
handle_call({open_result, T0, DbName, file_exists}, From, Server);
handle_call({open_result, _T0, DbName, Error}, {FromPid, _Tag}, Server) ->
handle_call({open_result, _T0, DbName, Error}, {Opener, _}, Server) ->
case ets:lookup(couch_dbs, DbName) of
[] ->
% db was deleted during async open
{reply, ok, Server};
[#entry{req_type = ReqType, waiters = Waiters} = Entry] ->
[#entry{pid = Opener, req_type = ReqType, waiters = Waiters} = Entry] ->
[gen_server:reply(Waiter, Error) || Waiter <- Waiters],
couch_log:info("open_result error ~p for ~s", [Error, DbName]),
true = ets:delete(couch_dbs, DbName),
true = ets:delete(couch_dbs_pid_to_name, FromPid),
true = ets:delete(couch_dbs_pid_to_name, Opener),
NewServer = case ReqType of
{create, DbName, Engine, Options, CrFrom} ->
open_async(Server, CrFrom, DbName, Engine, Options);
_ ->
Server
end,
{reply, ok, db_closed(NewServer, Entry#entry.db_options)}
{reply, ok, db_closed(NewServer, Entry#entry.db_options)};
[#entry{}] ->
% A mismatched pid means that this open_result message
% was in our mailbox and is now stale. Ignore it.
{reply, ok, Server}
end;
handle_call({open, DbName, Options}, From, Server) ->
case ets:lookup(couch_dbs, DbName) of
Expand Down
19 changes: 12 additions & 7 deletions src/couch/src/couch_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
-export([unique_monotonic_integer/0]).
-export([check_config_blacklist/1]).
-export([check_md5/2]).
-export([set_mqd_off_heap/0]).
-export([set_mqd_off_heap/1]).

-include_lib("couch/include/couch_db.hrl").

Expand Down Expand Up @@ -670,12 +670,17 @@ check_md5(Sig, Sig) -> ok;
check_md5(_, _) -> throw(md5_mismatch).


set_mqd_off_heap() ->
try
erlang:process_flag(message_queue_data, off_heap),
ok
catch error:badarg ->
ok
set_mqd_off_heap(Module) ->
case config:get_boolean("off_heap_mqd", atom_to_list(Module), true) of
true ->
try
erlang:process_flag(message_queue_data, off_heap),
ok
catch error:badarg ->
ok
end;
false ->
ok
end.


Expand Down
173 changes: 173 additions & 0 deletions src/couch/test/couch_server_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-include("../src/couch_server_int.hrl").

start() ->
Ctx = test_util:start_couch(),
Expand Down Expand Up @@ -105,3 +106,175 @@ bad_engine_option_test_() ->
t_bad_engine_option() ->
Resp = couch_server:create(?tempdb(), [{engine, <<"cowabunga!">>}]),
?assertEqual(Resp, {error, {invalid_engine_extension, <<"cowabunga!">>}}).


interleaved_requests_test_() ->
{
setup,
fun start_interleaved/0,
fun stop_interleaved/1,
fun make_interleaved_requests/1
}.


start_interleaved() ->
TestDbName = ?tempdb(),
meck:new(couch_db, [passthrough]),
meck:expect(couch_db, start_link, fun(Engine, DbName, Filename, Options) ->
case DbName of
TestDbName ->
receive
go -> ok
end,
Res = meck:passthrough([Engine, DbName, Filename, Options]),
% We're unlinking and sending a delayed
% EXIT signal so that we can mimic a specific
% message order in couch_server. On a test machine
% this is a big race condition which affects the
% ability to induce the bug.
case Res of
{ok, Db} ->
DbPid = couch_db:get_pid(Db),
unlink(DbPid),
Msg = {'EXIT', DbPid, killed},
erlang:send_after(2000, whereis(couch_server), Msg);
_ ->
ok
end,
Res;
_ ->
meck:passthrough([Engine, DbName, Filename, Options])
end
end),
{test_util:start_couch(), TestDbName}.


stop_interleaved({Ctx, TestDbName}) ->
couch_server:delete(TestDbName, [?ADMIN_CTX]),
meck:unload(),
test_util:stop_couch(Ctx).


make_interleaved_requests({_, TestDbName}) ->
[
fun() -> t_interleaved_create_delete_open(TestDbName) end
].


t_interleaved_create_delete_open(DbName) ->
{CrtRef, DelRef, OpenRef} = {make_ref(), make_ref(), make_ref()},
CrtMsg = {'$gen_call', {self(), CrtRef}, {create, DbName, [?ADMIN_CTX]}},
DelMsg = {'$gen_call', {self(), DelRef}, {delete, DbName, [?ADMIN_CTX]}},
OpenMsg = {'$gen_call', {self(), OpenRef}, {open, DbName, [?ADMIN_CTX]}},

% Get the current couch_server pid so we're sure
% to not end up messaging two different pids
CouchServer = whereis(couch_server),

% Start our first instance that will succeed in
% an invalid state. Notice that the opener pid
% spawned by couch_server:open_async/5 will halt
% in our meck expect function waiting for a message.
%
% We're using raw message passing here so that we don't
% have to coordinate multiple processes for this test.
CouchServer ! CrtMsg,
{ok, Opener} = get_opener_pid(DbName),

% We have to suspend couch_server so that we can enqueue
% our next requests and let the opener finish processing.
erlang:suspend_process(CouchServer),

% Since couch_server is suspend, this delete request won't
% be processed until after the opener has sent its
% successful open response via gen_server:call/3
CouchServer ! DelMsg,

% This open request will be in the queue after the
% delete request but before the gen_server:call/3
% message which will establish the mixed up state
% in the couch_dbs ets table
CouchServer ! OpenMsg,

% First release the opener pid so it can continue
% working while we tweak meck
Opener ! go,

% Replace our expect call to meck so that the OpenMsg
% isn't blocked on the receive
meck:expect(couch_db, start_link, fun(Engine, DbName1, Filename, Options) ->
meck:passthrough([Engine, DbName1, Filename, Options])
end),

% Wait for the '$gen_call' message from OpenerPid to arrive
% in couch_server's mailbox
ok = wait_for_open_async_result(CouchServer, Opener),

% Now monitor and resume the couch_server and assert that
% couch_server does not crash while processing OpenMsg
CSRef = erlang:monitor(process, CouchServer),
erlang:resume_process(CouchServer),
check_monitor_not_triggered(CSRef),

% The create response is expected to return not_found
% due to the delete request canceling the async opener
% pid and sending not_found to all waiters unconditionally
?assertEqual({CrtRef, not_found}, get_next_message()),

% Our delete request was processed normally
?assertEqual({DelRef, ok}, get_next_message()),

% The db was deleted thus it should be not found
% when we try and open it.
?assertMatch({OpenRef, {not_found, no_db_file}}, get_next_message()),

% And finally assert that couch_server is still
% alive.
?assert(is_process_alive(CouchServer)),
check_monitor_not_triggered(CSRef).


get_opener_pid(DbName) ->
WaitFun = fun() ->
case ets:lookup(couch_dbs, DbName) of
[#entry{pid = Pid}] ->
{ok, Pid};
[] ->
wait
end
end,
test_util:wait(WaitFun).


wait_for_open_async_result(CouchServer, Opener) ->
WaitFun = fun() ->
{_, Messages} = erlang:process_info(CouchServer, messages),
Found = lists:foldl(fun(Msg, Acc) ->
case Msg of
{'$gen_call', {Opener, _}, {open_result, _, _, {ok, _}}} ->
true;
_ ->
Acc
end
end, false, Messages),
if Found -> ok; true -> wait end
end,
test_util:wait(WaitFun).


check_monitor_not_triggered(Ref) ->
receive
{'DOWN', Ref, _, _, Reason0} ->
erlang:error({monitor_triggered, Reason0})
after 100 ->
ok
end.


get_next_message() ->
receive
Msg ->
Msg
after 5000 ->
erlang:error(timeout)
end.
2 changes: 1 addition & 1 deletion src/couch_log/src/couch_log_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ log(Entry) ->


init(_) ->
couch_util:set_mqd_off_heap(),
couch_util:set_mqd_off_heap(?MODULE),
process_flag(trap_exit, true),
{ok, #st{
writer = couch_log_writer:init()
Expand Down
2 changes: 1 addition & 1 deletion src/ddoc_cache/src/ddoc_cache_lru.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ refresh(DbName, DDocIds) ->


init(_) ->
couch_util:set_mqd_off_heap(),
couch_util:set_mqd_off_heap(?MODULE),
process_flag(trap_exit, true),
BaseOpts = [public, named_table],
CacheOpts = [
Expand Down
2 changes: 1 addition & 1 deletion src/mem3/src/mem3_shards.erl
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ handle_config_terminate(_Server, _Reason, _State) ->
erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener).

init([]) ->
couch_util:set_mqd_off_heap(),
couch_util:set_mqd_off_heap(?MODULE),
ets:new(?SHARDS, [
bag,
public,
Expand Down
2 changes: 1 addition & 1 deletion src/rexi/src/rexi_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ start_link(ServerId) ->
gen_server:start_link({local, ServerId}, ?MODULE, [], []).

init([]) ->
couch_util:set_mqd_off_heap(),
couch_util:set_mqd_off_heap(?MODULE),
{ok, #st{}}.

handle_call(get_errors, _From, #st{errors = Errors} = St) ->
Expand Down