Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Design Proposal #2229

Closed
Tracked by #2194
mch2 opened this issue Feb 23, 2022 · 14 comments
Closed
Tracked by #2194

[Segment Replication] Design Proposal #2229

mch2 opened this issue Feb 23, 2022 · 14 comments
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request

Comments

@mch2
Copy link
Member

mch2 commented Feb 23, 2022

This document outlines a proposal for implementing segment replication without additional external dependencies. This is a WIP. Please feel free to leave comments below with any thoughts or suggestions!

Objective:
Copy Lucene’s immutable segment files to replicas instead of document replication where documents are sent to and re-indexed on all replicas. This will improve indexing throughput and lower resource utilization on replicas at the expense of increased network usage. The RFC has a good introduction to both replication strategies.

Requirements:
With the feature enabled, primary shards will be the only shards indexing into Lucene. Replica shards exist as read only copies. After a primary shard creates new segment files, either from indexing or a segment merge, they are sent to replicas where they are made searchable.

Proposal:

segrep

Segment replication will be triggered when a primary shard refreshes. A refresh on a primary shard occurs for various reasons, for example on a schedule, after a merge, or flush. During a refresh, Lucene performs an os flush that writes the latest in memory segment files to the filesystem, making them available for replicas to fetch. A listener will trigger after the primary finishes refresh and send a notification to replicas. This notification will include a ReplicationCheckpoint that includes the sequence number associated with the latest document indexed, the latest commit generation, and primary term. If the replica determines that it is behind this checkpoint, it initiates a replication process.

seq2

  1. Lucene's SegmentInfos object references the set of active segments on the primary. Metadata includes names and checksums for all current segment files. The replica first fetches the latest SegmentInfos and file metadata associated with the received Checkpoint from the primary. When the primary receives this request it will incRef all files in the checkpoint to ensure they are not merged away until copy completes.
  2. The replica computes a diff against its local file metadata to determine which files are missing or changed.
  3. The replica requests missing and changed files from the primary shard.
  4. The primary shard sends all requested files to the replica. Replicas validate the files with Lucene checksum.
  5. Replicas validate their store ensuring all segment files referenced by the copied SegmentInfos are present. They then clean their local directory of any files not referenced. This will remove merged away segments.
  6. The replica refreshes its local OpenSearchDirectoryReader with the copied SegmentInfos object, making the new segments searchable. This is safe to do because we have already copied new/updated segments from the primary that will be referenced by the updated SegmentInfos.

Read only replicas:
To make replica shards read only, we need to disable their IndexWriter and noop all operations that attempt to interact with it inside of the engine. The POC does with a config param sent to the engine to conditionally start up a writer. To make this cleaner, creating separate engine and shard implementations for replicas is currently being explored.

Replication checkpoints:
ReplicationCheckpoints will be processed serially when received by replicas. If there is an active replication event when a checkpoint is received, the replica will store and attempt to process the checkpoint after it completes. Primary shards will compute the latest metadata and segmentInfos, even if this data is ahead of the requested checkpoint. The actual checkpoint processed will be computed and returned to the replica and compared against the latest received checkpoint.

Durability:
With the initial implementation, all documents will still be sent to each shard and persisted in the transaction log. Operations will not be purged from a replica's copy of the translog until a new commit point is received. While we have the translog to replay operations, we must still guarantee the stability of the index on disk so that it can be started and operations replayed. To maintain OpenSearch's durability guarantee, segments on both primary and replica shards will be by default fsynced to disk on commit. On replicas we recognize that a commit has occurred with an increase in the commit generation sent with the ReplicationCheckpoint.

In addition to fsyncs, we need to ensure that at all times replicas have the previous commit point. It is possible for a replica to be added after a Primary has been created, so sending the latest SegmentInfos info from a primary can leave the replica in a corrupt state. To ensure each replica has a both the latest SegmentInfos and all segments referenced by the latest commit point, the primary will include both in its metadata list returned to replicas.

For users who do not require this level of durability or it is acceptable to recover from a snapshot, I propose we add an additional setting that disables fsyncs on replicas and prevents sending the additional merged away files.

Merge:
There is no special logic for merges. After a primary completes a merge, it will refresh to open up a new reader on new segments and mark the old segments for deletion. This refresh will trigger our listener and publish a new ReplicationCheckpoint to replicas.

Shard startup:
Currently when replica shards are started they go through the peer recovery service and recover from the active primary. With segment replication shards will be created as empty and a notification sent to the primary to initiate tracking on the new replica. This ensures that all future checkpoints will be sent to the replica and will initiate replication on the next refresh.

Performance:
Early performance tests show improvements with segment replication enabled. This run using OpenSearch benchmark showed a ~40-45% drop in CPU and Memory usage, a 19% drop in p99 latency and a 57% increase in p100 throughput.

Instance type: m5.xlarge
Cluster Details: 3 Nodes with 6 shards and 1 replica each.
Test Dataset: Stackoverflow for 3 test iterations with 2 warmup iterations.

IOPS:
Document Replication: (Read 852k + Write 71k) / 1hr = 256 IOPS
Segment Replication: (Read 145k + Write 1M) / 1 hr = 318 IOPS

Total Bandwidth used:
Document Replication: 527 Gb
Segment Replication: 929 Gb
Main branch - Document Replication P0 P50 P99 P100
index - throughput req/s 17,401.60 18,661 N/A 22,203.40
index - latency (ms) 2,139.50 3,031.90 4,315.10 6,876.70
CPU 74 98 99 99
Memory 45 61 66 68
Feature branch - Segment Replication P0 P50 P99 P100
index - throughput req/s 28,795.90 31,525.70 N/A 34,845.70
index - latency (ms) 1,213.40 1,921 3,495.50 6,060.60
CPU 51.5 53 53 53
Memory 31.5 38 38 38

Data from more test runs will be added here as we continue building out the feature and run tests with different cluster configurations.

Failover: #2212

Shard Allocation: #6210

Configuration:
Segment replication will be disabled by default and enabled with a setting during index creation. To begin with this will be a simple boolean setting.

FAQ:

  1. Why bother sending checkpoints? Can the primary send over new segments when they are available?
    This will be a lot of load on primary shards. They will have to first fetch and compute the metadata diff from every replica and then orchestrate their copy and hold that state between requests.
  2. How do we include extension points for future improvements?
    While the first implementation will use the primary shard as our object store and transport layer for messaging, we will extract interfaces allowing for these to be swapped out for other options.
  3. Can we migrate existing indices to use Segment Replication?
    This should be supported in future versions but not part of the initial launch of the feature.
@anasalkouz anasalkouz added enhancement Enhancement or improvement to existing feature or request distributed framework labels Mar 17, 2022
@getsaurabh02
Copy link
Member

@mch2 Thanks for sharing the detailed comparison. Could you also share the Disk IO metrics for both the approaches, as I can see there is going to be a significant trade-off with the new approach, since the same document will be replicated twice, once as part of the translog and then as part of the Segment Replication. Since we are saving on the compute and memory here, it would be good to offset that against the increased IO provisioning cost for disk which customers would incur here.

Also the IO cost would be even grow higher as the size of indices grows, leading to larger segments. Merges would then create even larger (new) segment files which will need fresh replication to replica nodes adding to the network and disk IO costs.

@mch2
Copy link
Member Author

mch2 commented Mar 23, 2022

Thanks @getsaurabh02, Am gathering IOPS and network data and will add it asap.

Made updates above to Durability section, fsyncs will still be required by default.

@peternied
Copy link
Member

peternied commented Mar 23, 2022

The new design proposal here is a modification of the existing replication system that introduces new tradeoffs. I am curious about presenting a non-replicated index vs this proposed segment replication vs existing replication system, providing increasing thresholds of reliability with trade offs of performance and resource utilization. There could be a 4th+ options in the future using by using an external durable store and it would be useful to see where it aligns on axis of performance and reliability compared to the other existing options.

@dblock
Copy link
Member

dblock commented Mar 23, 2022

Objective: Improve indexing throughput and reduce replica resource utilization by copying Lucene’s immutable segment files to replicas.

I think it's worth explaining in this issue what the current replication mechanism is, so "copying segment files to replicas, instead of document replication that ... "

Collected some questions from an IRL brainstorm:

  • Is the eventual goal to get rid of document replication, or are there reasons to preserve it, or to use both in different scenarios?
  • Any strategies to reduce network traffic?
  • Segment replication could enable rollback, how can it be useful?
  • Can replicas send data to other replicas? Primary could tell a replica to get its data from another replica.
  • Should replicas falling behind past X time (e.g. 30 minutes) be dropped?
  • Can we reduce failover time using segment replication?
  • What does segment replication mean for the number of replicas?
  • How does this get us closer to separating reads and writes?

@mch2
Copy link
Member Author

mch2 commented Mar 29, 2022

Added total bw used during the test run and IOPS.

@nknize
Copy link
Collaborator

nknize commented Mar 29, 2022

Durability... all documents will still be sent to each shard and persisted in the transaction log

Can we plan to isolate durability? That is the translog and IndexWriter can be in a "black box" that we can optimize for bulk indexing and provide parameters to "tune" durability based on user need? (e.g., a DurabilityPolicy(ies) that enable users to disable durability altogether if their usecase does not require it?)

Follow up: that "black box" can also be tweaked for bulk indexing. Today the _bulk API places the multi-threading burden on the user. Isolating to a "black box" allows us to handle the threading for the user based on the system load? Call it a Streaming Index API?

There is no special logic for merges.

+1 I love how merges are separated today, but what about the new Merge On Refresh policy just added? Have we benchmarked with this enabled? Should we consider it to be the default when using segment replication?

@anasalkouz
Copy link
Member

Segment replication will be disabled by default and enabled with a setting during index creation. To begin with this will be a simple boolean setting

can we turn on/off seg replication? instead of having this on index creation level.

@andrross
Copy link
Member

I'd like to see some discussion on the consistency changes that a user will experience with segment replication. I don't think OpenSearch gives strong transactional guarantees today with document replication, but I believe all acknowledged writes will be visible to a user regardless of whether a query hits a primary or replica. I think this will change with segment replication.

@kartg
Copy link
Member

kartg commented Mar 29, 2022

I think it's worth explaining in this issue what the current replication mechanism is, so "copying segment files to replicas, instead of document replication that ... "

I'd suggest just pointing to the RFC section that covers document replication.

  1. The replica refreshes its local OpenSearchDirectoryReader with the copied SegmentInfos object, making the new segments searchable. This is safe to do because we have already copied new/updated segments from the primary that will be referenced by the updated SegmentInfos.

Is this refresh coupled to the segment replicaiton process i.e. do we forcefully refresh once replication is complete? Does this mean we need to disable the on-schedule refresh on replicas?

If there is an active replication event when a checkpoint is received, the replica will store and attempt to process the checkpoint after it completes

Will every checkpoint be queued on the replica?

With segment replication shards will be created as empty and a notification sent to the primary to initiate tracking on the new replica.

When you say "created as empty" does this imply a change to a "lazy" form of shard startup compared to the exisitng method of shard recovery?

@vigyasharma
Copy link
Contributor

Thanks @mch2 for this well thought through design proposal. I like the approach, and it is exciting to think of the possibilities that segment replication will open up. The performance improvements in your POC are impressive..
Couple of early thoughts and questions -

If I understood correctly, the primary pushes a notification to replicas (via a listener) each time it refreshes, and replicas then invoke a GET checkpoint API to get the latest delta of segments. Have you considered having replicas just poll the primary periodically for new segments?
The community may eventually want to add third party plugins for external storage services. It might be hard for them to notify specific replica shards. A simple pull mechanism implemented, could be easily extended for such use cases later. (I love that the design today is completely self sufficient, just thinking along the lines of keeping these doors open.)
In general, pull mechanisms can cause a lot of wasted work, but in this case, I think the overall n/w chatter will be lower than document replication today.

__

On similar lines, segment replication, could in future, enable OpenSearch to support different number of indexing and search shards for an index. Instead of the current setup of having replicas mapped to a single primary, we could have a search shard (replica) subscribe to multiple indexing shards (primaries), fetch their segments and add them into its own directory. This would give users a true indexing and search compute separation. The replicas subscribing to a primary and periodically pulling segments from it model lends itself more naturally to such a many:1 subscription setup.

kartg added a commit to kartg/OpenSearch that referenced this issue Apr 18, 2022
This is a part of the process of merging our feature branch - feature/segment-replication - back into main by re-PRing our changes from the feature branch.
GatedAutoCloseable currently wraps a subclass of RefCounted. Segment replication adds another subclass, but this also wraps RefCounted. Both subclasses have the same shutdown hook - decRef. This change makes the superclass less generic to increase code convergence.

The breakdown of the plan to merge segment-replication to main is detailed in opensearch-project#2355
Segment replication design proposal - opensearch-project#2229

Signed-off-by: Kartik Ganesh <gkart@amazon.com>
@sachinpkale
Copy link
Member

I have few questions on the config part.

Configuration:
Segment replication will be disabled by default and enabled with a setting during index creation. To begin with this will be a simple boolean setting.

  1. As it is index level setting, a cluster can have some indices with document based replication and others with segment based replication. Is that correct?
  2. Can Segment Replication be dynamically enabled/disabled for an index?
  3. If answer to above question is no, then what is the suggested way to disable the feature if user is experiencing issue with segment replication?

dblock pushed a commit that referenced this issue Apr 20, 2022
* Refactoring GatedAutoCloseable to AutoCloseableRefCounted

This is a part of the process of merging our feature branch - feature/segment-replication - back into main by re-PRing our changes from the feature branch.
GatedAutoCloseable currently wraps a subclass of RefCounted. Segment replication adds another subclass, but this also wraps RefCounted. Both subclasses have the same shutdown hook - decRef. This change makes the superclass less generic to increase code convergence.

The breakdown of the plan to merge segment-replication to main is detailed in #2355
Segment replication design proposal - #2229

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Minor refactoring in RecoveryState

This change makes two minor updates to RecoveryState -
1. The readRecoveryState API is removed because it can be replaced by an invocation of the constructor
2. The class members of the Timer inner class are changed to private, and accesses are only through the public APIs

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Update RecoveryTargetTests to test Timer subclasses deterministically

This change removes the use of RandomBoolean in testing the Timer classes and creates a dedicated unit test for each. The common test logic is shared via a private method.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Move the RecoveryState.Timer class to a top-level class

This will eventually be reused across both replication use-cases - peer recovery and segment replication.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Further update of timer tests in RecoveryTargetTests

Removes a non-deterministic code path around stopping the timer, and avoids assertThat (deprecated)

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Rename to ReplicationTimer

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Remove RecoveryTargetTests assert on a running timer

Trying to serialize and deserialize a running Timer instance, and then checking for equality leads to flaky test failures when the ser/deser takes time.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>
opensearch-trigger-bot bot pushed a commit that referenced this issue Apr 20, 2022
* Refactoring GatedAutoCloseable to AutoCloseableRefCounted

This is a part of the process of merging our feature branch - feature/segment-replication - back into main by re-PRing our changes from the feature branch.
GatedAutoCloseable currently wraps a subclass of RefCounted. Segment replication adds another subclass, but this also wraps RefCounted. Both subclasses have the same shutdown hook - decRef. This change makes the superclass less generic to increase code convergence.

The breakdown of the plan to merge segment-replication to main is detailed in #2355
Segment replication design proposal - #2229

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Minor refactoring in RecoveryState

This change makes two minor updates to RecoveryState -
1. The readRecoveryState API is removed because it can be replaced by an invocation of the constructor
2. The class members of the Timer inner class are changed to private, and accesses are only through the public APIs

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Update RecoveryTargetTests to test Timer subclasses deterministically

This change removes the use of RandomBoolean in testing the Timer classes and creates a dedicated unit test for each. The common test logic is shared via a private method.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Move the RecoveryState.Timer class to a top-level class

This will eventually be reused across both replication use-cases - peer recovery and segment replication.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Further update of timer tests in RecoveryTargetTests

Removes a non-deterministic code path around stopping the timer, and avoids assertThat (deprecated)

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Rename to ReplicationTimer

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Remove RecoveryTargetTests assert on a running timer

Trying to serialize and deserialize a running Timer instance, and then checking for equality leads to flaky test failures when the ser/deser takes time.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>
(cherry picked from commit c7c410a)
kartg added a commit that referenced this issue Apr 21, 2022
…#3014)

* Refactoring GatedAutoCloseable to AutoCloseableRefCounted

This is a part of the process of merging our feature branch - feature/segment-replication - back into main by re-PRing our changes from the feature branch.
GatedAutoCloseable currently wraps a subclass of RefCounted. Segment replication adds another subclass, but this also wraps RefCounted. Both subclasses have the same shutdown hook - decRef. This change makes the superclass less generic to increase code convergence.

The breakdown of the plan to merge segment-replication to main is detailed in #2355
Segment replication design proposal - #2229

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Minor refactoring in RecoveryState

This change makes two minor updates to RecoveryState -
1. The readRecoveryState API is removed because it can be replaced by an invocation of the constructor
2. The class members of the Timer inner class are changed to private, and accesses are only through the public APIs

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Update RecoveryTargetTests to test Timer subclasses deterministically

This change removes the use of RandomBoolean in testing the Timer classes and creates a dedicated unit test for each. The common test logic is shared via a private method.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Move the RecoveryState.Timer class to a top-level class

This will eventually be reused across both replication use-cases - peer recovery and segment replication.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Further update of timer tests in RecoveryTargetTests

Removes a non-deterministic code path around stopping the timer, and avoids assertThat (deprecated)

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Rename to ReplicationTimer

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Remove RecoveryTargetTests assert on a running timer

Trying to serialize and deserialize a running Timer instance, and then checking for equality leads to flaky test failures when the ser/deser takes time.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>
(cherry picked from commit c7c410a)

Co-authored-by: Kartik Ganesh <gkart@amazon.com>
This was referenced Jun 29, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request
Projects
None yet
Development

No branches or pull requests