Skip to content

Commit 15174fa

Browse files
authored
Merge pull request #81 from basho/mas-i1775-fetchclocksmodrange
Mas i1775 fetchclocksmodrange
2 parents 762c7cb + 8c43ac1 commit 15174fa

File tree

3 files changed

+70
-19
lines changed

3 files changed

+70
-19
lines changed

include/rhc.hrl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
%% -------------------------------------------------------------------
2222

2323
-define(DEFAULT_TIMEOUT, 60000).
24+
-define(AAEFOLD_TIMEOUT, 3600000).
2425

2526
-record(rhc, {ip,
2627
port,

rebar.config

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
{mochiweb, {git, "git://github.com/basho/mochiweb.git", {tag, "riak_kv-3.0.0"}}},
1313

1414
%% riak-erlang-client for riakc_obj
15-
{riakc, {git, "git://github.com/basho/riak-erlang-client", {tag, "riak_kv-3.0.1"}}}
15+
{riakc, {git, "git://github.com/basho/riak-erlang-client", {branch, "develop-3.0"}}}
1616
]}.
1717
{edoc_opts,
1818
[

src/rhc.erl

Lines changed: 68 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
aae_merge_root/2,
7070
aae_merge_branches/3,
7171
aae_fetch_clocks/3,
72+
aae_fetch_clocks/4,
7273
aae_range_tree/7,
7374
aae_range_clocks/5,
7475
aae_range_replkeys/5,
@@ -299,7 +300,7 @@ rt_enqueue(Rhc, Bucket, Key, Options) ->
299300
aae_merge_root(Rhc, NVal) ->
300301
Url = make_cached_aae_url(Rhc, root, NVal, undefined),
301302

302-
case request(get, Url, ["200"], [], [], Rhc) of
303+
case request(get, Url, ["200"], [], [], Rhc, ?AAEFOLD_TIMEOUT) of
303304
{ok, _Status, _Headers, Body} ->
304305
{struct, Response} = mochijson2:decode(Body),
305306
{ok, erlify_aae_root(Response)};
@@ -317,16 +318,16 @@ aae_merge_root(Rhc, NVal) ->
317318
aae_merge_branches(Rhc, NVal, Branches) ->
318319
Url = make_cached_aae_url(Rhc, branch, NVal, Branches),
319320

320-
case request(get, Url, ["200"], [], [], Rhc) of
321+
case request(get, Url, ["200"], [], [], Rhc, ?AAEFOLD_TIMEOUT) of
321322
{ok, _Status, _Headers, Body} ->
322323
{struct, Response} = mochijson2:decode(Body),
323324
{ok, erlify_aae_branches(Response)};
324325
{error, Error} ->
325326
{error, Error}
326327
end.
327328

328-
%% @doc get the aae merged branches for the given `NVal', restricted
329-
%% to the given list of `Branches'
329+
%% @doc fetch the keys and clocks for the given `NVal', restricted
330+
%% to the given list of `Segments'
330331
-spec aae_fetch_clocks(rhc(),
331332
NVal::pos_integer(),
332333
Segments::list(pos_integer())) ->
@@ -335,7 +336,26 @@ aae_merge_branches(Rhc, NVal, Branches) ->
335336
aae_fetch_clocks(Rhc, NVal, Segments) ->
336337
Url = make_cached_aae_url(Rhc, keysclocks, NVal, Segments),
337338

338-
case request(get, Url, ["200"], [], [], Rhc) of
339+
case request(get, Url, ["200"], [], [], Rhc, ?AAEFOLD_TIMEOUT) of
340+
{ok, _Status, _Headers, Body} ->
341+
{struct, Response} = mochijson2:decode(Body),
342+
{ok, erlify_aae_keysclocks(Response)};
343+
{error, Error} ->
344+
{error, Error}
345+
end.
346+
347+
%% @doc fetch the keys and clocks for the given `NVal', restricted
348+
%% to the given list of `Segments' and by a modified range
349+
-spec aae_fetch_clocks(rhc(),
350+
NVal::pos_integer(),
351+
Segments::list(pos_integer()),
352+
ModifiedRange::modified_range()) ->
353+
{ok, {keysclocks, [{{riakc_obj:bucket(), riakc_obj:key()}, binary()}]}} |
354+
{error, any()}.
355+
aae_fetch_clocks(Rhc, NVal, Segments, ModifiedRange) ->
356+
Url =
357+
make_cached_aae_url(Rhc, keysclocks, NVal, {Segments, ModifiedRange}),
358+
case request(get, Url, ["200"], [], [], Rhc, ?AAEFOLD_TIMEOUT) of
339359
{ok, _Status, _Headers, Body} ->
340360
{struct, Response} = mochijson2:decode(Body),
341361
{ok, erlify_aae_keysclocks(Response)};
@@ -381,7 +401,7 @@ aae_range_tree(Rhc, BucketAndType, KeyRange,
381401
"?filter=", encode_aae_range_filter(KeyRange, SegmentFilter, ModifiedRange, HashMethod)
382402
]),
383403

384-
case request(get, Url, ["200"], [], [], Rhc) of
404+
case request(get, Url, ["200"], [], [], Rhc, ?AAEFOLD_TIMEOUT) of
385405
{ok, _Status, _Headers, Body} ->
386406
{struct, Response} = mochijson2:decode(Body),
387407
{ok, erlify_aae_tree(Response)};
@@ -404,7 +424,7 @@ aae_range_clocks(Rhc, BucketAndType, KeyRange, SegmentFilter, ModifiedRange) ->
404424
"?filter=", encode_aae_range_filter(KeyRange, SegmentFilter, ModifiedRange, undefined)
405425
]),
406426

407-
case request(get, Url, ["200"], [], [], Rhc) of
427+
case request(get, Url, ["200"], [], [], Rhc, ?AAEFOLD_TIMEOUT) of
408428
{ok, _Status, _Headers, Body} ->
409429
{struct, Response} = mochijson2:decode(Body),
410430
{ok, erlify_aae_keysclocks(Response)};
@@ -436,7 +456,7 @@ aae_range_replkeys(Rhc, BucketType, KeyRange, ModifiedRange, QueueName) ->
436456
encode_aae_range_filter(KeyRange, all, ModifiedRange, undefined)
437457
]),
438458

439-
case request(get, Url, ["200"], [], [], Rhc) of
459+
case request(get, Url, ["200"], [], [], Rhc, ?AAEFOLD_TIMEOUT) of
440460
{ok, _Status, _Headers, Body} ->
441461
{struct, Response} = mochijson2:decode(Body),
442462
[{<<"dispatched_count">>, DispatchedCount}] = Response,
@@ -486,7 +506,7 @@ aae_find_keys(Rhc, BucketAndType, KeyRange, ModifiedRange, Query) ->
486506
"?filter=", encode_aae_find_keys_filter(KeyRange, undefined, ModifiedRange)
487507
]),
488508

489-
case request(get, Url, ["200"], [], [], Rhc) of
509+
case request(get, Url, ["200"], [], [], Rhc, ?AAEFOLD_TIMEOUT) of
490510
{ok, _Status, _Headers, Body} ->
491511
{struct, Response} = mochijson2:decode(Body),
492512
{ok, erlify_aae_find_keys(Response)};
@@ -520,7 +540,7 @@ aae_find_tombs(Rhc, BucketAndType, KeyRange, SegmentFilter, ModifiedRange) ->
520540
ModifiedRange)
521541
]),
522542

523-
case request(get, Url, ["200"], [], [], Rhc) of
543+
case request(get, Url, ["200"], [], [], Rhc, ?AAEFOLD_TIMEOUT) of
524544
{ok, _Status, _Headers, Body} ->
525545
{struct, Response} = mochijson2:decode(Body),
526546
{ok, erlify_aae_find_keys(Response)};
@@ -566,7 +586,7 @@ aae_reap_tombs(Rhc,
566586
ChangeMethod)
567587
]),
568588

569-
case request(get, Url, ["200"], [], [], Rhc) of
589+
case request(get, Url, ["200"], [], [], Rhc, ?AAEFOLD_TIMEOUT) of
570590
{ok, _Status, _Headers, Body} ->
571591
{struct, Response} = mochijson2:decode(Body),
572592
[{<<"dispatched_count">>, DispatchedCount}] = Response,
@@ -613,7 +633,7 @@ aae_erase_keys(Rhc,
613633
ChangeMethod)
614634
]),
615635

616-
case request(get, Url, ["200"], [], [], Rhc) of
636+
case request(get, Url, ["200"], [], [], Rhc, ?AAEFOLD_TIMEOUT) of
617637
{ok, _Status, _Headers, Body} ->
618638
{struct, Response} = mochijson2:decode(Body),
619639
[{<<"dispatched_count">>, DispatchedCount}] = Response,
@@ -655,7 +675,7 @@ aae_object_stats(Rhc, BucketAndType, KeyRange, ModifiedRange) ->
655675
"?filter=", encode_aae_find_keys_filter(KeyRange, undefined, ModifiedRange)
656676
]),
657677

658-
case request(get, Url, ["200"], [], [], Rhc) of
678+
case request(get, Url, ["200"], [], [], Rhc, ?AAEFOLD_TIMEOUT) of
659679
{ok, _Status, _Headers, Body} ->
660680
{struct, Response} = mochijson2:decode(Body),
661681
{ok, {stats, erlify_aae_object_stats(Response)}};
@@ -681,7 +701,7 @@ aae_list_buckets(Rhc, MinNVal) when is_integer(MinNVal), MinNVal > 0 ->
681701
"?filter=", integer_to_list(MinNVal)]),
682702
aae_list_buckets(Rhc, Url);
683703
aae_list_buckets(Rhc, Url) when is_list(Url) ->
684-
case request(get, Url, ["200"], [], [], Rhc) of
704+
case request(get, Url, ["200"], [], [], Rhc, ?AAEFOLD_TIMEOUT) of
685705
{ok, _Status, _Headers, Body} ->
686706
{struct, [{<<"results">>, Response}]} = mochijson2:decode(Body),
687707
{ok, erlify_aae_buckets(Response)};
@@ -1355,25 +1375,51 @@ make_rtenqueue_url(Rhc=#rhc{}, BucketAndType, Key, Query) ->
13551375
-spec make_cached_aae_url(rhc(),
13561376
root | branch | keysclocks,
13571377
NVal :: pos_integer(),
1358-
Filter :: proplists:proplist()|undefined) ->
1378+
IDs :: list(non_neg_integer())|
1379+
undefined|
1380+
{list(non_neg_integer()),
1381+
modified_range()}) ->
13591382
iolist().
1360-
make_cached_aae_url(Rhc, Type, NVal, Filter) ->
1383+
make_cached_aae_url(Rhc, Type, NVal, undefined) ->
1384+
complete_cached_aae_url(Rhc, NVal, Type, []);
1385+
make_cached_aae_url(Rhc, Type, NVal, IDs) when is_list(IDs) ->
1386+
Filter = ["?filter=", encode_aae_cached_filter(IDs)],
1387+
complete_cached_aae_url(Rhc, NVal, Type, Filter);
1388+
make_cached_aae_url(Rhc, Type, NVal, {IDs, ModifiedRange}) ->
1389+
Filter = ["?filter=", encode_aae_cached_filter(IDs, ModifiedRange)],
1390+
complete_cached_aae_url(Rhc, NVal, Type, Filter).
1391+
1392+
1393+
complete_cached_aae_url(Rhc, NVal, Type, Filter) ->
13611394
lists:flatten(
13621395
[root_url(Rhc),
13631396
"cachedtrees", "/", %% the AAE-Fold cachedtrees prefix
13641397
"nvals", "/",
13651398
integer_to_list(NVal), "/",
13661399
atom_to_list(Type),
1367-
[ ["?filter=", encode_aae_cached_filter(Filter)] || Filter =/= undefined]
1400+
Filter
13681401
]).
13691402

1403+
1404+
13701405
%% @doc this is a list of integers. Segment IDs or Branches, but
13711406
%% either way, just json encode a list of ints
13721407
-spec encode_aae_cached_filter(list(pos_integer())) -> string().
13731408
encode_aae_cached_filter(Filter) ->
13741409
JSON = mochijson2:encode(Filter),
13751410
base64:encode_to_string(lists:flatten(JSON)).
13761411

1412+
-spec encode_aae_cached_filter(list(pos_integer()),
1413+
modified_range()) -> string().
1414+
encode_aae_cached_filter(SegmentIDs, ModifiedRange) ->
1415+
FilterElems = [EncodeFun(FilterElem) || {EncodeFun, FilterElem} <-
1416+
[{fun encode_segment_filter/1,
1417+
{SegmentIDs, large}},
1418+
{fun encode_modified_range/1,
1419+
ModifiedRange}]],
1420+
JSON = mochijson2:encode({struct, lists:flatten(FilterElems)}),
1421+
base64:encode_to_string(iolist_to_binary(JSON)).
1422+
13771423
-spec encode_aae_find_keys_filter(key_range(),
13781424
segment_filter() | undefined,
13791425
modified_range()) ->
@@ -1483,11 +1529,15 @@ make_datatype_url(Rhc, BucketAndType, Key, Query) ->
14831529

14841530
%% @doc send an ibrowse request
14851531
request(Method, Url, Expect, Headers, Body, Rhc) ->
1532+
request(Method, Url, Expect, Headers, Body, Rhc, ?DEFAULT_TIMEOUT).
1533+
1534+
request(Method, Url, Expect, Headers, Body, Rhc, Timeout) ->
14861535
AuthHeader = get_auth_header(Rhc#rhc.options),
14871536
SSLOptions = get_ssl_options(Rhc#rhc.options),
14881537
Accept = {"Accept", "multipart/mixed, */*;q=0.9"},
14891538
case ibrowse:send_req(Url, [Accept|Headers] ++ AuthHeader, Method, Body,
1490-
[{response_format, binary}] ++ SSLOptions) of
1539+
[{response_format, binary}] ++ SSLOptions,
1540+
Timeout) of
14911541
Resp={ok, Status, _, _} ->
14921542
case lists:member(Status, Expect) of
14931543
true -> Resp;

0 commit comments

Comments
 (0)