Skip to content

Commit f1d7e2d

Browse files
committed
RCBC-443: Support for Subdocument Read from Replica
1 parent a51c47e commit f1d7e2d

File tree

9 files changed

+561
-32
lines changed

9 files changed

+561
-32
lines changed

ext/couchbase

ext/couchbase.cxx

Lines changed: 247 additions & 26 deletions
Large diffs are not rendered by default.

lib/couchbase/collection.rb

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,62 @@ def lookup_in(id, specs, options = Options::LookupIn::DEFAULT)
487487
end
488488
end
489489

490+
# Performs lookups to document fragments. Reads from the active node and all available replicas and returns the
491+
# first result found
492+
#
493+
# @param [String] id the document id which is used to uniquely identify it.
494+
# @param [Array<LookupInSpec>] specs the list of specifications which describe the types of the lookups to perform
495+
# @param [Options::LookupInAnyReplica] options request customization
496+
#
497+
# @return [LookupInReplicaResult]
498+
#
499+
# @raise [Error::DocumentIrretrievable]
500+
# @raise [Error::Timeout]
501+
# @raise [Error::CouchbaseError]
502+
# @raise [Error::FeatureNotAvailable]
503+
def lookup_in_any_replica(id, specs, options = Options::LookupInAnyReplica::DEFAULT)
504+
resp = @backend.document_lookup_in_any_replica(
505+
bucket_name, @scope_name, @name, id,
506+
specs.map do |s|
507+
{
508+
opcode: s.type,
509+
xattr: s.xattr?,
510+
path: s.path,
511+
}
512+
end, options.to_backend
513+
)
514+
extract_lookup_in_replica_result(resp, options)
515+
end
516+
517+
# Performs lookups to document fragments. Reads from the active node and all available replicas and returns all of
518+
# the results
519+
#
520+
# @param [String] id the document id which is used to uniquely identify it.
521+
# @param [Array<LookupInSpec>] specs the list of specifications which describe the types of the lookups to perform
522+
# @param [Options::LookupInAllReplicas] options request customization
523+
#
524+
# @return [Array<LookupInReplicaResult>]
525+
#
526+
# @raise [Error::DocumentIrretrievable]
527+
# @raise [Error::Timeout]
528+
# @raise [Error::CouchbaseError]
529+
# @raise [Error::FeatureNotAvailable]
530+
def lookup_in_all_replicas(id, specs, options = Options::LookupInAllReplicas::DEFAULT)
531+
resp = @backend.document_lookup_in_all_replicas(
532+
bucket_name, @scope_name, @name, id,
533+
specs.map do |s|
534+
{
535+
opcode: s.type,
536+
xattr: s.xattr?,
537+
path: s.path,
538+
}
539+
end, options.to_backend
540+
)
541+
resp.map do |entry|
542+
extract_lookup_in_replica_result(entry, options)
543+
end
544+
end
545+
490546
# Performs mutations to document fragments
491547
#
492548
# @param [String] id the document id which is used to uniquely identify it.
@@ -581,6 +637,23 @@ def extract_mutation_token(resp)
581637
end
582638
end
583639

640+
def extract_lookup_in_replica_result(resp, options)
641+
LookupInReplicaResult.new do |res|
642+
res.transcoder = options.transcoder
643+
res.cas = resp[:cas]
644+
res.deleted = resp[:deleted]
645+
res.is_replica = resp[:is_replica]
646+
res.encoded = resp[:fields].map do |field|
647+
SubDocumentField.new do |f|
648+
f.exists = field[:exists]
649+
f.index = field[:index]
650+
f.path = field[:path]
651+
f.value = field[:value]
652+
end
653+
end
654+
end
655+
end
656+
584657
# @api private
585658
# TODO: deprecate in 3.1
586659
GetOptions = ::Couchbase::Options::Get

lib/couchbase/collection_options.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,18 @@ def get_field_at_index(path_or_index)
230230
end
231231
end
232232

233+
class LookupInReplicaResult < LookupInResult
234+
# @return [Boolean] true if the document was read from a replica node
235+
attr_accessor :is_replica
236+
alias replica? is_replica
237+
238+
# @yieldparam [LookupInReplicaResult] self
239+
def initialize
240+
super
241+
yield self if block_given?
242+
end
243+
end
244+
233245
class MutateInResult < MutationResult
234246
# Decodes the content at the given index
235247
#

lib/couchbase/options.rb

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1026,6 +1026,80 @@ def to_backend
10261026
DEFAULT = LookupIn.new.freeze
10271027
end
10281028

1029+
# Options for {Collection#lookup_in_any_replica}
1030+
class LookupInAnyReplica < Base
1031+
attr_accessor :transcoder # @return [JsonTranscoder, #decode(String)]
1032+
1033+
# Creates an instance of options for {Collection#lookup_in_any_replica}
1034+
#
1035+
# @param [JsonTranscoder, #decode(String)] transcoder used for encoding
1036+
#
1037+
# @param [Integer, #in_milliseconds, nil] timeout
1038+
# @param [Proc, nil] retry_strategy the custom retry strategy, if set
1039+
# @param [Hash, nil] client_context the client context data, if set
1040+
# @param [Span, nil] parent_span if set holds the parent span, that should be used for this request
1041+
#
1042+
# @yieldparam [LookupIn] self
1043+
def initialize(transcoder: JsonTranscoder.new,
1044+
timeout: nil,
1045+
retry_strategy: nil,
1046+
client_context: nil,
1047+
parent_span: nil)
1048+
super(timeout: timeout, retry_strategy: retry_strategy, client_context: client_context, parent_span: parent_span)
1049+
@transcoder = transcoder
1050+
yield self if block_given?
1051+
end
1052+
1053+
# @api private
1054+
def to_backend
1055+
{
1056+
timeout: Utils::Time.extract_duration(@timeout),
1057+
}
1058+
end
1059+
1060+
# @api private
1061+
# @return [Boolean]
1062+
attr_accessor :access_deleted
1063+
1064+
# @api private
1065+
DEFAULT = LookupInAnyReplica.new.freeze
1066+
end
1067+
1068+
# Options for {Collection#lookup_in_all_replicas}
1069+
class LookupInAllReplicas < Base
1070+
attr_accessor :transcoder # @return [JsonTranscoder, #decode(String)]
1071+
1072+
# Creates an instance of options for {Collection#lookup_in_all_replicas}
1073+
#
1074+
# @param [JsonTranscoder, #decode(String)] transcoder used for encoding
1075+
#
1076+
# @param [Integer, #in_milliseconds, nil] timeout
1077+
# @param [Proc, nil] retry_strategy the custom retry strategy, if set
1078+
# @param [Hash, nil] client_context the client context data, if set
1079+
# @param [Span, nil] parent_span if set holds the parent span, that should be used for this request
1080+
#
1081+
# @yieldparam [LookupInAllReplicas] self
1082+
def initialize(transcoder: JsonTranscoder.new,
1083+
timeout: nil,
1084+
retry_strategy: nil,
1085+
client_context: nil,
1086+
parent_span: nil)
1087+
super(timeout: timeout, retry_strategy: retry_strategy, client_context: client_context, parent_span: parent_span)
1088+
@transcoder = transcoder
1089+
yield self if block_given?
1090+
end
1091+
1092+
# @api private
1093+
def to_backend
1094+
{
1095+
timeout: Utils::Time.extract_duration(@timeout),
1096+
}
1097+
end
1098+
1099+
# @api private
1100+
DEFAULT = LookupInAllReplicas.new.freeze
1101+
end
1102+
10291103
# Options for {Collection#scan}
10301104
class Scan < Base
10311105
attr_accessor :ids_only # @return [Boolean]

test/query_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,8 @@ def retry_on(exception)
224224
def test_scoped_query
225225
skip("The server does not support scoped queries (#{env.server_version})") unless env.server_version.supports_scoped_queries?
226226

227-
scope_name = uniq_id(:scope).delete(".")[0, 30]
228-
collection_name = uniq_id(:collection).delete(".")[0, 30]
227+
scope_name = uniq_id(:scope).delete("-")[0, 30]
228+
collection_name = uniq_id(:collection).delete("-")[0, 30]
229229

230230
manager = @bucket.collections
231231
ns_uid = manager.create_scope(scope_name)

test/scan_test.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,9 +321,8 @@ def test_range_scan_multiple_options
321321

322322
def test_range_scan_collection_does_not_exist
323323
collection = @bucket.scope("_default").collection(uniq_id(:nonexistent))
324-
scan_result = collection.scan(RangeScan.new)
325324
assert_raises(Error::CollectionNotFound) do
326-
validate_scan(scan_result, [])
325+
collection.scan(RangeScan.new)
327326
end
328327
end
329328

test/subdoc_test.rb

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1310,5 +1310,143 @@ def test_create_tombstones
13101310
assert_equal({"field" => "b"}, res.content(0))
13111311
assert_predicate res, :deleted?, "the document should be marked as 'deleted'"
13121312
end
1313+
1314+
def test_lookup_in_any_replica_get
1315+
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
1316+
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?
1317+
1318+
doc_id = uniq_id(:foo)
1319+
document = {"value" => 42}
1320+
@collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active))
1321+
1322+
res = @collection.lookup_in_any_replica(doc_id, [
1323+
LookupInSpec.get("value"),
1324+
])
1325+
1326+
assert_equal 42, res.content(0)
1327+
assert_respond_to res, :replica?
1328+
end
1329+
1330+
def test_lookup_in_all_replicas_get
1331+
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
1332+
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?
1333+
1334+
doc_id = uniq_id(:foo)
1335+
document = {"value" => 42}
1336+
@collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active))
1337+
1338+
res = @collection.lookup_in_all_replicas(doc_id, [
1339+
LookupInSpec.get("value"),
1340+
])
1341+
1342+
refute_empty res
1343+
1344+
res.each do |entry|
1345+
assert_equal 42, entry.content(0)
1346+
assert_respond_to entry, :replica?
1347+
end
1348+
end
1349+
1350+
def test_lookup_in_any_replica_get_doc
1351+
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
1352+
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?
1353+
1354+
doc_id = uniq_id(:foo)
1355+
document = {"value" => 42}
1356+
@collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active))
1357+
1358+
res = @collection.lookup_in_any_replica(doc_id, [
1359+
LookupInSpec.get(""),
1360+
])
1361+
1362+
assert_equal document, res.content(0)
1363+
assert_respond_to res, :replica?
1364+
end
1365+
1366+
def test_lookup_in_all_replicas_get_doc
1367+
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
1368+
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?
1369+
1370+
doc_id = uniq_id(:foo)
1371+
document = {"value" => 42}
1372+
@collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active))
1373+
1374+
res = @collection.lookup_in_all_replicas(doc_id, [
1375+
LookupInSpec.get(""),
1376+
])
1377+
1378+
refute_empty res
1379+
1380+
res.each do |entry|
1381+
assert_equal document, entry.content(0)
1382+
assert_respond_to entry, :replica?
1383+
end
1384+
end
1385+
1386+
def test_lookup_in_any_replica_exists
1387+
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
1388+
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?
1389+
1390+
doc_id = uniq_id(:foo)
1391+
document = {"value" => 42}
1392+
@collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active))
1393+
1394+
options = Options::LookupInAnyReplica.new
1395+
res = @collection.lookup_in_any_replica(doc_id, [
1396+
LookupInSpec.exists("value"),
1397+
LookupInSpec.exists("foo"),
1398+
], options)
1399+
1400+
assert res.exists?(0)
1401+
refute res.exists?(1)
1402+
assert_respond_to res, :replica?
1403+
end
1404+
1405+
def test_lookup_in_all_replicas_exist
1406+
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
1407+
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?
1408+
1409+
doc_id = uniq_id(:foo)
1410+
document = {"value" => 42}
1411+
@collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active))
1412+
1413+
options = Options::LookupInAllReplicas.new
1414+
res = @collection.lookup_in_all_replicas(doc_id, [
1415+
LookupInSpec.exists("value"),
1416+
LookupInSpec.exists("foo"),
1417+
], options)
1418+
1419+
refute_empty res
1420+
1421+
res.each do |entry|
1422+
assert entry.exists?(0)
1423+
refute entry.exists?(1)
1424+
assert_respond_to entry, :replica?
1425+
end
1426+
end
1427+
1428+
def test_lookup_in_any_replica_bad_key
1429+
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
1430+
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?
1431+
1432+
doc_id = uniq_id(:foo)
1433+
assert_raises(Error::DocumentIrretrievable) do
1434+
@collection.lookup_in_any_replica(doc_id, [
1435+
LookupInSpec.get("value"),
1436+
])
1437+
end
1438+
end
1439+
1440+
def test_lookup_in_all_replicas_bad_key
1441+
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
1442+
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?
1443+
1444+
doc_id = uniq_id(:foo)
1445+
assert_raises(Error::DocumentNotFound) do
1446+
@collection.lookup_in_all_replicas(doc_id, [
1447+
LookupInSpec.get("value"),
1448+
])
1449+
end
1450+
end
13131451
end
13141452
end

test/test_helper.rb

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,14 @@ def cheshire_cat?
5555
@version >= Gem::Version.create("7.0.0")
5656
end
5757

58+
def elixir?
59+
@version >= Gem::Version.create("7.5.0")
60+
end
61+
62+
def trinity?
63+
@version >= Gem::Version.create("7.6.0")
64+
end
65+
5866
def supports_collections?
5967
cheshire_cat?
6068
end
@@ -84,7 +92,11 @@ def is_rcbc_408_applicable?
8492
end
8593

8694
def supports_range_scan?
87-
@version >= Gem::Version.create("7.5.0")
95+
elixir?
96+
end
97+
98+
def supports_subdoc_read_from_replica?
99+
elixir?
88100
end
89101
end
90102

0 commit comments

Comments
 (0)