Skip to content

Commit 944ad08

Browse files
authored
RCBC-387: implement replica reads (#48)
1 parent 5a732a9 commit 944ad08

File tree

5 files changed

+153
-5
lines changed

5 files changed

+153
-5
lines changed

Gemfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ group :development do
2727
gem "minitest"
2828
gem "minitest-reporters"
2929
gem "rack"
30-
gem "rails"
30+
gem "rails", "~> 7.0.3"
3131
gem "reek"
3232
gem "rubocop"
3333
gem "rubocop-minitest"

ext/couchbase.cxx

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -924,7 +924,7 @@ cb_map_error_code(const couchbase::key_value_error_context& ctx, const std::stri
924924
{
925925
VALUE exc = cb_map_error_code(ctx.ec(), message);
926926
VALUE error_context = rb_hash_new();
927-
std::string error(fmt::format("{}, {}", ctx.ec().value(), ctx.ec().message()));
927+
std::string error(ctx.ec().message());
928928
rb_hash_aset(error_context, rb_id2sym(rb_intern("error")), cb_str_new(error));
929929
rb_hash_aset(error_context, rb_id2sym(rb_intern("id")), cb_str_new(ctx.id()));
930930
rb_hash_aset(error_context, rb_id2sym(rb_intern("scope")), cb_str_new(ctx.scope()));
@@ -2135,6 +2135,95 @@ cb_Backend_document_get(VALUE self, VALUE bucket, VALUE scope, VALUE collection,
21352135
return Qnil;
21362136
}
21372137

2138+
struct passthrough_transcoder {
2139+
using document_type = couchbase::codec::encoded_value;
2140+
2141+
static auto decode(const couchbase::codec::encoded_value& data) -> document_type
2142+
{
2143+
return data;
2144+
}
2145+
};
2146+
2147+
template<>
2148+
struct couchbase::codec::is_transcoder<passthrough_transcoder> : public std::true_type {
2149+
};
2150+
2151+
static VALUE
2152+
cb_Backend_document_get_any_replica(VALUE self, VALUE bucket, VALUE scope, VALUE collection, VALUE id, VALUE options)
2153+
{
2154+
const auto& core = cb_backend_to_cluster(self);
2155+
2156+
Check_Type(bucket, T_STRING);
2157+
Check_Type(scope, T_STRING);
2158+
Check_Type(collection, T_STRING);
2159+
Check_Type(id, T_STRING);
2160+
2161+
try {
2162+
couchbase::get_any_replica_options opts;
2163+
cb_options_set_timeout(opts, options);
2164+
2165+
auto f = couchbase::cluster(core)
2166+
.bucket(cb_string_new(bucket))
2167+
.scope(cb_string_new(scope))
2168+
.collection(cb_string_new(collection))
2169+
.get_any_replica(cb_string_new(id), opts);
2170+
auto [ctx, resp] = cb_wait_for_future(f);
2171+
if (ctx.ec()) {
2172+
cb_throw_error_code(ctx, "unable to get replica of the document");
2173+
}
2174+
2175+
auto value = resp.content_as<passthrough_transcoder>();
2176+
VALUE res = rb_hash_new();
2177+
rb_hash_aset(res, rb_id2sym(rb_intern("content")), cb_str_new(value.data));
2178+
rb_hash_aset(res, rb_id2sym(rb_intern("cas")), cb_cas_to_num(resp.cas()));
2179+
rb_hash_aset(res, rb_id2sym(rb_intern("flags")), UINT2NUM(value.flags));
2180+
return res;
2181+
} catch (const ruby_exception& e) {
2182+
rb_exc_raise(e.exception_object());
2183+
}
2184+
return Qnil;
2185+
}
2186+
2187+
static VALUE
2188+
cb_Backend_document_get_all_replicas(VALUE self, VALUE bucket, VALUE scope, VALUE collection, VALUE id, VALUE options)
2189+
{
2190+
const auto& core = cb_backend_to_cluster(self);
2191+
2192+
Check_Type(bucket, T_STRING);
2193+
Check_Type(scope, T_STRING);
2194+
Check_Type(collection, T_STRING);
2195+
Check_Type(id, T_STRING);
2196+
2197+
try {
2198+
couchbase::get_all_replicas_options opts;
2199+
cb_options_set_timeout(opts, options);
2200+
2201+
auto f = couchbase::cluster(core)
2202+
.bucket(cb_string_new(bucket))
2203+
.scope(cb_string_new(scope))
2204+
.collection(cb_string_new(collection))
2205+
.get_all_replicas(cb_string_new(id), opts);
2206+
auto [ctx, resp] = cb_wait_for_future(f);
2207+
if (ctx.ec()) {
2208+
cb_throw_error_code(ctx, "unable to get all replicas for the document");
2209+
}
2210+
2211+
VALUE res = rb_ary_new_capa(static_cast<long>(resp.size()));
2212+
for (const auto& entry : resp) {
2213+
VALUE response = rb_hash_new();
2214+
auto value = entry.content_as<passthrough_transcoder>();
2215+
rb_hash_aset(response, rb_id2sym(rb_intern("content")), cb_str_new(value.data));
2216+
rb_hash_aset(response, rb_id2sym(rb_intern("cas")), cb_cas_to_num(entry.cas()));
2217+
rb_hash_aset(response, rb_id2sym(rb_intern("flags")), UINT2NUM(value.flags));
2218+
rb_ary_push(res, response);
2219+
}
2220+
return res;
2221+
} catch (const ruby_exception& e) {
2222+
rb_exc_raise(e.exception_object());
2223+
}
2224+
return Qnil;
2225+
}
2226+
21382227
static VALUE
21392228
cb_Backend_document_get_multi(VALUE self, VALUE keys, VALUE options)
21402229
{
@@ -7671,6 +7760,8 @@ init_backend(VALUE mCouchbase)
76717760
rb_define_method(cBackend, "ping", VALUE_FUNC(cb_Backend_ping), 2);
76727761

76737762
rb_define_method(cBackend, "document_get", VALUE_FUNC(cb_Backend_document_get), 5);
7763+
rb_define_method(cBackend, "document_get_any_replica", VALUE_FUNC(cb_Backend_document_get_any_replica), 5);
7764+
rb_define_method(cBackend, "document_get_all_replicas", VALUE_FUNC(cb_Backend_document_get_all_replicas), 5);
76747765
rb_define_method(cBackend, "document_get_multi", VALUE_FUNC(cb_Backend_document_get_multi), 2);
76757766
rb_define_method(cBackend, "document_get_projected", VALUE_FUNC(cb_Backend_document_get_projected), 5);
76767767
rb_define_method(cBackend, "document_get_and_lock", VALUE_FUNC(cb_Backend_document_get_and_lock), 6);

lib/couchbase/collection.rb

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,15 +168,46 @@ def get_and_touch(id, expiry, options = Options::GetAndTouch.new)
168168
# @param [Options::GetAllReplicas] options request customization
169169
#
170170
# @return [Array<GetReplicaResult>]
171-
def get_all_replicas(id, options = Options::GetAllReplicas.new) end
171+
def get_all_replicas(id, options = Options::GetAllReplicas.new)
172+
resp = @backend.document_get_all_replicas(@bucket_name, @scope_name, @name, id, options.to_backend)
173+
resp.map do |entry|
174+
GetReplicaResult.new do |res|
175+
res.transcoder = options.transcoder
176+
res.cas = entry[:cas]
177+
res.flags = entry[:flags]
178+
res.encoded = entry[:content]
179+
res.is_replica = entry[:is_replica]
180+
end
181+
end
182+
end
172183

173-
# Reads all available replicas, and returns the first found
184+
# Reads all available replicas and active, and returns the first found.
174185
#
175186
# @param [String] id the document id which is used to uniquely identify it.
176187
# @param [Options::GetAnyReplica] options request customization
177188
#
189+
# @example Get document contents
190+
# res = collection.get_any_replica("customer123")
191+
# res.is_active #=> false
192+
# res.content["addresses"]
193+
#
194+
# # {"billing"=>
195+
# # {"line1"=>"123 Any Street", "line2"=>"Anytown", "country"=>"United Kingdom"},
196+
# # "delivery"=>
197+
# # {"line1"=>"123 Any Street", "line2"=>"Anytown", "country"=>"United Kingdom"}}
198+
#
199+
#
178200
# @return [GetReplicaResult]
179-
def get_any_replica(id, options = Options::GetAnyReplica.new) end
201+
def get_any_replica(id, options = Options::GetAnyReplica.new)
202+
resp = @backend.document_get_any_replica(@bucket_name, @scope_name, @name, id, options.to_backend)
203+
GetReplicaResult.new do |res|
204+
res.transcoder = options.transcoder
205+
res.cas = resp[:cas]
206+
res.flags = resp[:flags]
207+
res.encoded = resp[:content]
208+
res.is_replica = resp[:is_replica]
209+
end
210+
end
180211

181212
# Checks if the given document ID exists on the active partition.
182213
#

test/active_support/behaviors/local_cache_behavior.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,4 +291,13 @@ def test_local_cache_should_read_and_write_false
291291
assert_equal false, @cache.read(key)
292292
end
293293
end
294+
295+
def test_local_cache_should_deserialize_entries_on_multi_get
296+
keys = Array.new(5) { SecureRandom.uuid }
297+
values = keys.index_with(true)
298+
@cache.with_local_cache do
299+
assert @cache.write_multi(values)
300+
assert_equal values, @cache.read_multi(*keys)
301+
end
302+
end
294303
end

test/crud_test.rb

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,23 @@ def test_removes_documents
4848
end
4949
end
5050

51+
def test_reads_from_replica
52+
doc_id = uniq_id(:foo)
53+
document = {"value" => 42}
54+
@collection.upsert(doc_id, document)
55+
56+
res = @collection.get_any_replica(doc_id)
57+
assert_equal document, res.content
58+
assert_respond_to res, :replica?
59+
60+
res = @collection.get_all_replicas(doc_id)
61+
refute_empty res
62+
res.each do |entry|
63+
assert_equal document, entry.content
64+
assert_respond_to entry, :replica?
65+
end
66+
end
67+
5168
def test_touch_sets_expiration
5269
document = {"value" => 42}
5370
doc_id = uniq_id(:foo)

0 commit comments

Comments
 (0)