-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
POC for segment replication. #2075
POC for segment replication. #2075
Conversation
In this POC replicas are configured as read only by not creating an indexwriter. After primary shards refresh, a checkpoint is sent over the transport layer to replicas. Once received, replicas fetch files in the checkpoint from the primary shard. This initial commit ignores failover, retention leases, and shard allocation. Signed-off-by: Marc Handalian <handalm@amazon.com>
Can one of the admins verify this patch? |
This change will force an fsync on replicas when a new commit point is received. Signed-off-by: Marc Handalian <handalm@amazon.com>
70a98c6
to
12bf217
Compare
Signed-off-by: Marc Handalian <handalm@amazon.com>
Signed-off-by: Marc Handalian <handalm@amazon.com>
…y and mark the shard as active. With this change IndexShard.startRecovery will only set up a replica shard and mark it as tracked with the primary. We will then only start replication after the primary has refreshed after performing the first operation. This also avoids a condition when the initial recovery is trying to replicate from a primary shard that has not performed any operations and waits indefinately for a replica to catch up to the latest sequence number. This change also ensures that we are only ever performing one replication event at any given moment. Signed-off-by: Marc Handalian <handalm@amazon.com>
…checkpoint. This change ensures we do not start a replication sequence if we already have the checkpoint. This changes the checkpoint published from the primary to the latest processed checkpoint instead of the latest persisted. Signed-off-by: Marc Handalian <handalm@amazon.com>
To satisfy this invariant, This change updates the TRACK_SHARD action to clone the primary's retention lease and use it as the replicas. Signed-off-by: Marc Handalian <handalm@amazon.com>
…ng is setup. Fixes to sleep the thread instead of incorrectly using a monitor. Signed-off-by: Marc Handalian <handalm@amazon.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Publishing my review (which is basically a collection of notes and questions) to share context.
I'll approve these changes since they're going into the feature branch and continue to add comments so we can share different parts of work.
@@ -98,7 +98,7 @@ protected void shardOperationOnPrimary( | |||
@Override | |||
protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) { | |||
ActionListener.completeWith(listener, () -> { | |||
replica.flush(request.getRequest()); | |||
// replica.flush(request.getRequest()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to not break document replication,
- Leave this uncommented
- Pull in the
isSegmentReplicationEnabled
method from my PR - Add a if-clause for
isSegmentReplicationEnabled
within IndexShard's flush method and no-op it
Even though the flush
method returns a CommitId, I checked all invocations in the codebase and nowhere is the return value accessed/used. So it's safe to return null
or even change the method to have a void
return type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -61,11 +61,11 @@ | |||
declareBroadcastFields(PARSER); | |||
} | |||
|
|||
RefreshResponse(StreamInput in) throws IOException { | |||
public RefreshResponse(StreamInput in) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like you only use the other constructor. Does this one need to be public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used to inside of PublishCheckpointAction
to wire up the superclass with a response constructor.
private PublishCheckpointAction() {super(NAME, RefreshResponse::new);}
With that said - this was me being quick and dirty again for the poc. We shouldn't be reusing RefreshResponse here and instead create a new response type. Will make a separate task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @param request {@link PublishCheckpointRequest} The PublishCheckpointRequest | ||
* @param listener A listener to be notified with a result | ||
*/ | ||
void publishCheckpoint(PublishCheckpointRequest request, ActionListener<RefreshResponse> listener); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why you're re-using RefreshResponse
as the response for this request type? Would there be value in declaring a separate CheckpointResponse
(or similar) type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100%, I was cutting corners. Same as comment above making a separate issue to clean this up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recoveryStateFactory | ||
); | ||
recoveryStateFactory, | ||
checkpointPublisher); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooof, we should really think about refactoring this constructor, and the newIndexService
method 🤢
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree - I think we should revisit this after we have separate implementations of IndexShard and address how to load things as a module.
if (shard.routingEntry().primary()) { | ||
shard.scheduledRefresh(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retain code as-is and move the gating into IndexShard
, either based on isSegmentReplicationEnabled
or by using a separate IndexShard subclass that implements/overrides scheduleRefresh
as a no-op
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack - #2197
|
||
public class PublishCheckpointAction extends ActionType<RefreshResponse> { | ||
public static final PublishCheckpointAction INSTANCE = new PublishCheckpointAction(); | ||
public static final String NAME = "indices:admin/publishCheckpoint"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline - this doesn't need to be an admin
action
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
import java.io.IOException; | ||
|
||
public class PublishCheckpointRequest extends BroadcastRequest<PublishCheckpointRequest> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, we should refactor this to no longer be a BroadcastRequest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -249,6 +249,10 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException { | |||
} | |||
} | |||
|
|||
public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is an interface - does the presence of seqNo
make sense for this APi, or is this a POC signature driven by the need to call markSeqNoAsProcessed
in the implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The latter, but I don't know of another way than passing this down or explicitly calling a method on engine to set the processed cp here. Local checkpoints are only stored in user data on commit -
Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
assert gen == infos.getGeneration(); | ||
externalReaderManager.internalReaderManager.setCurrentInfos(infos); | ||
externalReaderManager.maybeRefresh(); | ||
localCheckpointTracker.markSeqNoAsProcessed(seqNo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this change the checkpoint being published?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally markSeqNoAsProcessed is called on during index in InternalEngine. This isn't invoked now on repilcas so we need to bring this seqNo up to date after it copies over segments.
This is read in multiple places, but for our case its used to ensure a new checkpoint needs to be processed when received from primaries.
} | ||
|
||
@Override | ||
public SegmentInfos getLatestSegmentInfos() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question - why is this a public method? Why would we allow segment infos to be accessed in an unsafe manner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used to get a point in time snapshot of the segmentInfos when we don't care about preserving the files. This is used in two spots:
- When a primary reads its latest info to populate a checkpoint before publishing. Primaries will always return the latest checkpoint data when requested directly and starting a copy sequence. During checkpoint publish we don't need to preserve those files yet before a replica asks for them.
- When a checkpoint is received on a replica and it gets its local view to compare against.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - good work getting it all working!
@@ -95,6 +95,7 @@ | |||
private final CircuitBreakerService circuitBreakerService; | |||
private final LongSupplier globalCheckpointSupplier; | |||
private final Supplier<RetentionLeases> retentionLeasesSupplier; | |||
private boolean isPrimary; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: move this attribute to IndexShard
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IndexShard actually populates this when wiring up the engine so we can conditionally make InternalEngine readOnly for replicas. I think a better route here is wiring up a different engine class for replicas.
* Snapshots the most recent safe index commit from the currently running engine. | ||
* All index files referenced by this index commit won't be freed until the commit/snapshot is closed. | ||
*/ | ||
public Engine.SegmentInfosRef getLatestSegmentInfosSafe() throws EngineException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment explaining the difference between safe version of getLatestSegmentInfos and regular - that it increments the reference count in safe in order to ensure it's not deleted after reading
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -79,6 +79,7 @@ public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempF | |||
|
|||
final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap(); | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to be a mistake - can revert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
@@ -0,0 +1,182 @@ | |||
/* | |||
* SPDX-License-Identifier: Apache-2.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can rename the file to SegmentReplicationReplicaService to follow the same format as SegmentReplicationPrimaryService and make its functionality clear. Else need to come up with a name that's less of a handful :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I don't love either of those names but you're right. I didn't like using source/target because our source is always the primary shard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this to SegmentReplicationReplicaService.
} | ||
|
||
@Override | ||
protected void closeInternal() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment: called when copystate removed from cache and no more references to it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add a comment inside the primary service, this class does not know anything about where it is cached.
checkpointCopyState.putIfAbsent(copyState.getCheckpoint(), copyState); | ||
} | ||
|
||
public CopyState getCopyStateForCheckpoint(ReplicationCheckpoint checkpoint) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: If multiple replicas are asking for same checkpoint, we want it to still stay in cache after fetching so we need a reference count to be incremented
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good call i forgot this! thanks.
return replicationRef; | ||
} | ||
|
||
// /** cancel the replication with the given id (if found) and remove it from the replication collection */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can replace this commented code with a TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uncommented this.
Signed-off-by: Marc Handalian <handalm@amazon.com>
This removes a wait in favor of throwing a retryable exception. Signed-off-by: Marc Handalian <handalm@amazon.com>
Renamed SegmentReplicationService -> SegmentReplicationReplicaService. Removed if conditions in SyncedFlushService and TransportShardFlushAction. Improved comments and documentation. Signed-off-by: Marc Handalian <handalm@amazon.com>
f258aca
to
f646a9e
Compare
Merging as is and so we can start splitting up issues to iterate on this with #2194 |
❌ Gradle Check failure f258aca630ab61ac26f7b8e0bb03278fc4209776 |
* Initial POC for segment replication. In this POC replicas are configured as read only by not creating an indexwriter. After primary shards refresh, a checkpoint is sent over the transport layer to replicas. Once received, replicas fetch files in the checkpoint from the primary shard. This initial commit ignores failover, retention leases, and shard allocation. Signed-off-by: Marc Handalian <handalm@amazon.com> * Remove bypass of fsync on primaries and force fsync on replicas. This change will force an fsync on replicas when a new commit point is received. Signed-off-by: Marc Handalian <handalm@amazon.com> * Fix replicas from processing checkpoints from other indices. Signed-off-by: Marc Handalian <handalm@amazon.com> * Remove explicit fsync when every file is copied. Signed-off-by: Marc Handalian <handalm@amazon.com> * Fix recovery states to move to completed during intiial shard recovery and mark the shard as active. With this change IndexShard.startRecovery will only set up a replica shard and mark it as tracked with the primary. We will then only start replication after the primary has refreshed after performing the first operation. This also avoids a condition when the initial recovery is trying to replicate from a primary shard that has not performed any operations and waits indefinately for a replica to catch up to the latest sequence number. This change also ensures that we are only ever performing one replication event at any given moment. Signed-off-by: Marc Handalian <handalm@amazon.com> * Ignore replication checkpoints if we are already up to the published checkpoint. This change ensures we do not start a replication sequence if we already have the checkpoint. This changes the checkpoint published from the primary to the latest processed checkpoint instead of the latest persisted. Signed-off-by: Marc Handalian <handalm@amazon.com> * Fix retention lease invariant in ReplicationTracker. To satisfy this invariant, This change updates the TRACK_SHARD action to clone the primary's retention lease and use it as the replicas. Signed-off-by: Marc Handalian <handalm@amazon.com> * Fix SegmentReplicationPrimaryService to wait until replica ShardRouting is setup. Fixes to sleep the thread instead of incorrectly using a monitor. Signed-off-by: Marc Handalian <handalm@amazon.com> * Remove duplicate method to fetch local checkpoint. Signed-off-by: Marc Handalian <handalm@amazon.com> * Fix startup when replicas are not listed in primary's routing table. This removes a wait in favor of throwing a retryable exception. Signed-off-by: Marc Handalian <handalm@amazon.com> * PR cleanup. Renamed SegmentReplicationService -> SegmentReplicationReplicaService. Removed if conditions in SyncedFlushService and TransportShardFlushAction. Improved comments and documentation. Signed-off-by: Marc Handalian <handalm@amazon.com>
Signed-off-by: Marc Handalian handalm@amazon.com
Description
In this POC replicas are configured as read only.
After primary shard refresh, a checkpoint is published over the transport layer to replicas.
Once received, replicas fetch files in the checkpoint from the primary shard.
This initial commit ignores failover, retention leases, and shard allocation.
There are numerous locations where code is commented out with TODOs. Gradle check and ITs are expected to fail.
Guide to reading this -
SegmentReplicationService
which is similar to PeerRecoveryTargetService but uses aReplicationTarget
class instead of a RecoveryTarget to drive operations on the replica shard.ReplicationTarget
will bootstrap the shard and request checkpoint information, missing/changed files, and theSegmentInfos
bytes from the primary. After files are received, it will update the current reader's SegmentInfos insideOpenSearchReaderManager
and refresh. This step is the same as Lucene's SegmentInfosSearcherManager.SegmentReplicationPrimaryService
handles requests to fetch checkpoints & send files. It delegates toPrimaryShardReplicationHandler
to send files - similar to recovery's RecoverySourceHandler. SegmentInfos is fetched directly from the current reader running on the primary.CheckpointRefreshListener
publishes a new checkpoint over the transport layer usingTransportCheckpointPublisher
when the primary shard refreshes.TransportPublishShardCheckpointAction
, when received it invokesIndexShard.onNewCheckpoint
to kick off another replication withSegmentReplicationService
.Issues Resolved
closes #1544
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.