Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC for segment replication. #2075

Conversation

mch2
Copy link
Member

@mch2 mch2 commented Feb 9, 2022

Signed-off-by: Marc Handalian handalm@amazon.com

Description

In this POC replicas are configured as read only.
After primary shard refresh, a checkpoint is published over the transport layer to replicas.
Once received, replicas fetch files in the checkpoint from the primary shard.
This initial commit ignores failover, retention leases, and shard allocation.

There are numerous locations where code is commented out with TODOs. Gradle check and ITs are expected to fail.

Guide to reading this -

  1. When a replica is created it now starts using SegmentReplicationService which is similar to PeerRecoveryTargetService but uses a ReplicationTarget class instead of a RecoveryTarget to drive operations on the replica shard.
  2. ReplicationTarget will bootstrap the shard and request checkpoint information, missing/changed files, and the SegmentInfos bytes from the primary. After files are received, it will update the current reader's SegmentInfos inside OpenSearchReaderManager and refresh. This step is the same as Lucene's SegmentInfosSearcherManager.
  3. On the Primary shard, SegmentReplicationPrimaryService handles requests to fetch checkpoints & send files. It delegates to PrimaryShardReplicationHandler to send files - similar to recovery's RecoverySourceHandler. SegmentInfos is fetched directly from the current reader running on the primary.
  4. A Refresh listener CheckpointRefreshListener publishes a new checkpoint over the transport layer using TransportCheckpointPublisher when the primary shard refreshes.
  5. Checkpoints are sent to replicas and received in TransportPublishShardCheckpointAction, when received it invokes IndexShard.onNewCheckpoint to kick off another replication with SegmentReplicationService.

Issues Resolved

closes #1544

Check List

  • New functionality includes testing.
    • All tests pass
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

In this POC replicas are configured as read only by not creating an indexwriter.
After primary shards refresh, a checkpoint is sent over the transport layer to replicas.
Once received, replicas fetch files in the checkpoint from the primary shard.
This initial commit ignores failover, retention leases, and shard allocation.

Signed-off-by: Marc Handalian <handalm@amazon.com>
@mch2 mch2 requested a review from a team as a code owner February 9, 2022 05:31
@opensearch-ci-bot
Copy link
Collaborator

Can one of the admins verify this patch?

@opensearch-ci-bot
Copy link
Collaborator

❌   Gradle Check failure 507bdab
Log 2303

Reports 2303

This change will force an fsync on replicas when a new commit point is received.

Signed-off-by: Marc Handalian <handalm@amazon.com>
@mch2 mch2 force-pushed the feature/segment-replication branch from 70a98c6 to 12bf217 Compare February 15, 2022 18:29
@opensearch-ci-bot
Copy link
Collaborator

❌   Gradle Check failure 12bf217
Log 2407

Reports 2407

Signed-off-by: Marc Handalian <handalm@amazon.com>
@opensearch-ci-bot
Copy link
Collaborator

❌   Gradle Check failure fc7ca0b
Log 2415

Reports 2415

Signed-off-by: Marc Handalian <handalm@amazon.com>
…y and mark the shard as active.

With this change IndexShard.startRecovery will only set up a replica shard and mark
it as tracked with the primary.  We will then only start replication after the
primary has refreshed after performing the first operation.
This also avoids a condition when the initial recovery is trying to replicate
from a primary shard that has not performed any operations and waits indefinately for
a replica to catch up to the latest sequence number.  This change also ensures that
we are only ever performing one replication event at any given moment.

Signed-off-by: Marc Handalian <handalm@amazon.com>
@opensearch-ci-bot
Copy link
Collaborator

❌   Gradle Check failure b1a0510
Log 2555

Reports 2555

…checkpoint.

This change ensures we do not start a replication sequence if we already have the checkpoint.
This changes the checkpoint published from the primary to the latest processed checkpoint instead of the latest persisted.

Signed-off-by: Marc Handalian <handalm@amazon.com>
@opensearch-ci-bot
Copy link
Collaborator

❌   Gradle Check failure 9728020
Log 2567

Reports 2567

To satisfy this invariant, This change updates the TRACK_SHARD action to clone
the primary's retention lease and use it as the replicas.

Signed-off-by: Marc Handalian <handalm@amazon.com>
@opensearch-ci-bot
Copy link
Collaborator

❌   Gradle Check failure dac3a4a
Log 2606

Reports 2606

…ng is setup.

Fixes to sleep the thread instead of incorrectly using a monitor.

Signed-off-by: Marc Handalian <handalm@amazon.com>
@opensearch-ci-bot
Copy link
Collaborator

❌   Gradle Check failure db0baac
Log 2609

Reports 2609

Copy link
Member

@kartg kartg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Publishing my review (which is basically a collection of notes and questions) to share context.

I'll approve these changes since they're going into the feature branch and continue to add comments so we can share different parts of work.

@@ -98,7 +98,7 @@ protected void shardOperationOnPrimary(
@Override
protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
replica.flush(request.getRequest());
// replica.flush(request.getRequest());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to not break document replication,

  1. Leave this uncommented
  2. Pull in the isSegmentReplicationEnabled method from my PR
  3. Add a if-clause for isSegmentReplicationEnabled within IndexShard's flush method and no-op it

Even though the flush method returns a CommitId, I checked all invocations in the codebase and nowhere is the return value accessed/used. So it's safe to return null or even change the method to have a void return type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -61,11 +61,11 @@
declareBroadcastFields(PARSER);
}

RefreshResponse(StreamInput in) throws IOException {
public RefreshResponse(StreamInput in) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like you only use the other constructor. Does this one need to be public?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used to inside of PublishCheckpointAction to wire up the superclass with a response constructor.

private PublishCheckpointAction() {super(NAME, RefreshResponse::new);}

With that said - this was me being quick and dirty again for the poc. We shouldn't be reusing RefreshResponse here and instead create a new response type. Will make a separate task.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* @param request {@link PublishCheckpointRequest} The PublishCheckpointRequest
* @param listener A listener to be notified with a result
*/
void publishCheckpoint(PublishCheckpointRequest request, ActionListener<RefreshResponse> listener);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why you're re-using RefreshResponse as the response for this request type? Would there be value in declaring a separate CheckpointResponse (or similar) type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100%, I was cutting corners. Same as comment above making a separate issue to clean this up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recoveryStateFactory
);
recoveryStateFactory,
checkpointPublisher);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooof, we should really think about refactoring this constructor, and the newIndexService method 🤢

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree - I think we should revisit this after we have separate implementations of IndexShard and address how to load things as a module.

Comment on lines +917 to +919
if (shard.routingEntry().primary()) {
shard.scheduledRefresh();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retain code as-is and move the gating into IndexShard, either based on isSegmentReplicationEnabled or by using a separate IndexShard subclass that implements/overrides scheduleRefresh as a no-op

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack - #2197


public class PublishCheckpointAction extends ActionType<RefreshResponse> {
public static final PublishCheckpointAction INSTANCE = new PublishCheckpointAction();
public static final String NAME = "indices:admin/publishCheckpoint";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline - this doesn't need to be an admin action

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


import java.io.IOException;

public class PublishCheckpointRequest extends BroadcastRequest<PublishCheckpointRequest> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, we should refactor this to no longer be a BroadcastRequest

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -249,6 +249,10 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
}
}

public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is an interface - does the presence of seqNo make sense for this APi, or is this a POC signature driven by the need to call markSeqNoAsProcessed in the implementation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latter, but I don't know of another way than passing this down or explicitly calling a method on engine to set the processed cp here. Local checkpoints are only stored in user data on commit -

Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));

assert gen == infos.getGeneration();
externalReaderManager.internalReaderManager.setCurrentInfos(infos);
externalReaderManager.maybeRefresh();
localCheckpointTracker.markSeqNoAsProcessed(seqNo);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this change the checkpoint being published?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally markSeqNoAsProcessed is called on during index in InternalEngine. This isn't invoked now on repilcas so we need to bring this seqNo up to date after it copies over segments.

This is read in multiple places, but for our case its used to ensure a new checkpoint needs to be processed when received from primaries.

}

@Override
public SegmentInfos getLatestSegmentInfos() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question - why is this a public method? Why would we allow segment infos to be accessed in an unsafe manner?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used to get a point in time snapshot of the segmentInfos when we don't care about preserving the files. This is used in two spots:

  1. When a primary reads its latest info to populate a checkpoint before publishing. Primaries will always return the latest checkpoint data when requested directly and starting a copy sequence. During checkpoint publish we don't need to preserve those files yet before a replica asks for them.
  2. When a checkpoint is received on a replica and it gets its local view to compare against.

Copy link
Contributor

@Poojita-Raj Poojita-Raj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - good work getting it all working!

@@ -95,6 +95,7 @@
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private boolean isPrimary;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: move this attribute to IndexShard

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IndexShard actually populates this when wiring up the engine so we can conditionally make InternalEngine readOnly for replicas. I think a better route here is wiring up a different engine class for replicas.

* Snapshots the most recent safe index commit from the currently running engine.
* All index files referenced by this index commit won't be freed until the commit/snapshot is closed.
*/
public Engine.SegmentInfosRef getLatestSegmentInfosSafe() throws EngineException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment explaining the difference between safe version of getLatestSegmentInfos and regular - that it increments the reference count in safe in order to ensure it's not deleted after reading

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -79,6 +79,7 @@ public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempF

final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap();


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be a mistake - can revert

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

@@ -0,0 +1,182 @@
/*
* SPDX-License-Identifier: Apache-2.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can rename the file to SegmentReplicationReplicaService to follow the same format as SegmentReplicationPrimaryService and make its functionality clear. Else need to come up with a name that's less of a handful :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't love either of those names but you're right. I didn't like using source/target because our source is always the primary shard.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this to SegmentReplicationReplicaService.

}

@Override
protected void closeInternal() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment: called when copystate removed from cache and no more references to it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add a comment inside the primary service, this class does not know anything about where it is cached.

checkpointCopyState.putIfAbsent(copyState.getCheckpoint(), copyState);
}

public CopyState getCopyStateForCheckpoint(ReplicationCheckpoint checkpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: If multiple replicas are asking for same checkpoint, we want it to still stay in cache after fetching so we need a reference count to be incremented

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call i forgot this! thanks.

return replicationRef;
}

// /** cancel the replication with the given id (if found) and remove it from the replication collection */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can replace this commented code with a TODO?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uncommented this.

Signed-off-by: Marc Handalian <handalm@amazon.com>
This removes a wait in favor of throwing a retryable exception.

Signed-off-by: Marc Handalian <handalm@amazon.com>
@opensearch-ci-bot
Copy link
Collaborator

❌   Gradle Check failure f619be7
Log 2679

Reports 2679

Renamed SegmentReplicationService -> SegmentReplicationReplicaService.
Removed if conditions in SyncedFlushService and TransportShardFlushAction.
Improved comments and documentation.

Signed-off-by: Marc Handalian <handalm@amazon.com>
@mch2 mch2 force-pushed the feature/segment-replication branch from f258aca to f646a9e Compare February 22, 2022 01:57
@mch2 mch2 merged commit 11a1f52 into opensearch-project:feature/segment-replication Feb 22, 2022
@mch2
Copy link
Member Author

mch2 commented Feb 22, 2022

Merging as is and so we can start splitting up issues to iterate on this with #2194

@opensearch-ci-bot
Copy link
Collaborator

❌   Gradle Check failure f258aca630ab61ac26f7b8e0bb03278fc4209776
Log 2689

Reports 2689

@opensearch-ci-bot
Copy link
Collaborator

❌   Gradle Check failure f646a9e
Log 2690

Reports 2690

Poojita-Raj pushed a commit to Poojita-Raj/OpenSearch that referenced this pull request Feb 28, 2022
* Initial POC for segment replication.

In this POC replicas are configured as read only by not creating an indexwriter.
After primary shards refresh, a checkpoint is sent over the transport layer to replicas.
Once received, replicas fetch files in the checkpoint from the primary shard.
This initial commit ignores failover, retention leases, and shard allocation.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Remove bypass of fsync on primaries and force fsync on replicas.

This change will force an fsync on replicas when a new commit point is received.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix replicas from processing checkpoints from other indices.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Remove explicit fsync when every file is copied.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix recovery states to move to completed during intiial shard recovery and mark the shard as active.

With this change IndexShard.startRecovery will only set up a replica shard and mark
it as tracked with the primary.  We will then only start replication after the
primary has refreshed after performing the first operation.
This also avoids a condition when the initial recovery is trying to replicate
from a primary shard that has not performed any operations and waits indefinately for
a replica to catch up to the latest sequence number.  This change also ensures that
we are only ever performing one replication event at any given moment.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Ignore replication checkpoints if we are already up to the published checkpoint.

This change ensures we do not start a replication sequence if we already have the checkpoint.
This changes the checkpoint published from the primary to the latest processed checkpoint instead of the latest persisted.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix retention lease invariant in ReplicationTracker.

To satisfy this invariant, This change updates the TRACK_SHARD action to clone
the primary's retention lease and use it as the replicas.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix SegmentReplicationPrimaryService to wait until replica ShardRouting is setup.

Fixes to sleep the thread instead of incorrectly using a monitor.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Remove duplicate method to fetch local checkpoint.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix startup when replicas are not listed in primary's routing table.

This removes a wait in favor of throwing a retryable exception.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* PR cleanup.

Renamed SegmentReplicationService -> SegmentReplicationReplicaService.
Removed if conditions in SyncedFlushService and TransportShardFlushAction.
Improved comments and documentation.

Signed-off-by: Marc Handalian <handalm@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants