Description
Introduction
An Elasticsearch shard can receive indexing, update, and delete commands. Those changes are applied first on the primary shard, maintaining per doc semantics and are then replicated to all the replicas. All these operations happen concurrently. While we maintain ordering on a per doc basis, using versioning support there is no way to order them with respect to each other. Having such a per shard operation ordering will enable us to implement higher level features such as Changes API (follow changes to documents in a shard and index) and Reindexing API (take all data from a shard and reindex it into another, potentially mutating the data). Internally we could use this ordering to speed up shard recoveries, by identifying which specific operations need to be replayed to the recovering replica instead of falling back to a file based sync.
To get such ordering, each operation will be assigned a unique and ever increasing Sequence Number (in short, seq#). This sequence number will be assigned on the primary and replicated to all replicas. Seq# are to be indexed in Lucene to allow sorting, range filtering etc.
Warning, research ahead
What follows in this ticket is the current thinking about how to best implement this feature. It may change in subtle or major ways as the work continues. Is is important to implement this infrastructure in a way that is correct, resilient to failures, and without slowing down indexing speed. We feel confident with the approach described below, but we may have to backtrack or change the approach completely.
What is a Sequence
Applying an operation order on a primary is a simple question of incrementing a local counter for every operation. However, this is not sufficient to guarantee global uniqueness and monotonicity under error conditions where the primary shard can be isolated by a network partition. For those, the identity of the current primary needs to be baked into each operation. For example, late to arrive operations from an old primary can be detected and rejected.
In short, each operation is assigned two numbers:
- a
term
- this number is incremented with every primary assignment and is determined by the cluster master. This is very similar to the notion of aterm
in Raft, aview-number
in Viewstamped Replication or anepoch
in Zab. - a
seq#
- this number is incremented by the primary with each operation it processes.
To achieve ordering, when comparing two operations , o1
& o2
, we say that o1
< o2
if and only if s1.seq#
< s2.seq#
or (s1.seq#
== s2.seq#
and s1.term
< s2.term
). Equality and greater than are defined in a similar fashion.
For reasons explained later on, we maintain for each shard copy two special seq#:
local checkpoint#
- this is the highest seq# for which all lower seq# have been processed . Note that this is not the highest seq# the shard has processed due to the concurrent indexing, which means that some changes can be processed while previous more heavy ones can still be on going.global checkpoint#
(or justcheckpoint#
) - the highest seq# for which the local shard can guarantee that all previous (included) seq# have been processed on all active shard copies (i.e., primary and replicas).
Those two numbers will be maintained in memory but also persisted in the metadata of every lucene commit.
Changes to indexing flow on primaries
Here is a sketch of the indexing code on primaries. Much of it is identical to the current logic. Changes or additions are marked in bold .
- Validate write consistency based on routing tables.
- Incoming indexing request is parsed first (rejected upon mapping/parsing failures)
- Under uid lock:
- Versioning is resolved to a fixed version to be indexed.
- Operation is assigned a seq# and a term
- Doc is indexed into Lucene.
- Doc is put into translog.
- Replication
- Failures in step 3 above are also replicated (eg due to failure of lucene tokenization)
- Send docs to all assigned replicas.
- Replicas respond with their current
local checkpoint#
. - When all respond (or have failed), send answer to client.
- Checkpoint update:
- Update the global `checkpoint# to the highest seq# for which all active replicas have processed all lower seq# (inclusive). This is based on information received in 4.3 .
- If changed, send new global
checkpoint#
to replicas (can be folded into a heartbeat/next index req).
Changes to indexing flow on replicas
As above, this is sketch of the indexing code on replicas. Changes with the current logic are marked as bold.
- Validate request
- Seq#'s term is >= locally known primary term.
- Under uid lock:
- Index into Lucene if seq# > local copy and doesn't represent an error on primary.
- Add to local translog.
- Respond with the current
local checkpoint#
Global Checkpoint# increment on replicas
The primary advances its global checkpoint#
based on its knowledge of its local and replica's local checkpoint#
. Periodically it shares its knowledge with the replicas
- Validate source:
- source's primary term is == locally known primary term.
- Validate correctness:
- Check that all sequence# below the new
global checkpoint#
were processed and local checkpoint# is of the same primary term. If not, fail shard.
- Check that all sequence# below the new
- Set the shard’s copy of
global checkpoint#
, if it's lower than the incoming global checkpoint.
Note that the global checkpoint is a local knowledge of that is update under the mandate of the primary. It may be that the primary information is lagging compared to a replica. This can happen when a replica is promoted to a primary (but still has stale info).
First use case - faster replica recovery
Have an ordering of operations allows us to speed up the recovery process of an existing replica and synchronization with the primary. At the moment, we do file based sync which typically results in over-copying of data. Having a clearly marked checkpoint#
allows us to limit the sync operation to just those documents that have changed subsequently. In many cases we expect to have no documents to sync at all. This improvement will be tracked in a separate issue.
Road map
Basic infra
- Introduce Primary Terms (Introduce Primary Terms #14062)
- Introduce Seq# and index them (Add Sequence Numbers and enforce Primary Terms #14651)
- Introduce local checkpoints (Introduce Local checkpoints #15390)
- Introduce global checkpoints (Introduced Global checkpoints for Sequence Numbers #15485)
- Replicated failed operations for which a seq# was assigned (@areek) Replicate write failures #23314
- Create testing infrastructure to allow testing replication as a unit test, but with real IndexShards (Introduce Replication unit tests using real shards #18930)
- Persist local and global checkpoints (Persist sequence number checkpoints #18949)
- Update to Lucene 6.0 (Sequence numbers commit data for Lucene uses Iterable interface #20793)
- Persist global checkpoint in translog commits (@jasontedor) Add global checkpoint to translog checkpoints #21254
- Reading global checkpoint from translog should be part of translog recovery (@jasontedor) Clarify global checkpoint recovery #21934
- transfer global checkpoint after peer recovery (@jasontedor) Tighten sequence numbers recovery #22212
- add translog no-op (@jasontedor) Introduce translog no-op #22291
- Handle retry on primary exceptions in shard bulk action; we currently potentially reuse the answers of the previous primary. This is needed for versioned operations and mapping conflicts (we process ops, the third op needs a mapping change, we reach out to the master, the master already processed that change from another shard and responds with nothing to do, we reparse the op but the node doesn't have the mapping changes yet, we throw a retry and re-run the previous ops as well, potentiall running into conflicts) (@bleskes)
- Don't fail shards as stale if they fail a global check point sync - this will happen all the time when a primary is started and wants to sync the global checkpoint. To achieve this we agreed to change default behavior of shard failures for replication operations (with the exception of write operations, see later). If an operation fails on a replica, the primary shouldn't fail the replica or mark it as stale. Instead it should report this to the user by failing the entire operation. Write Replication Operation (i.e., sub classes of
TransportWriteAction
) should keep the current behavior. (@dakrone) Change certain replica failures not to fail the replica shard #22874 - Transfer sequence number "primary context" (i.e., in sync shards, tracked local checkpoints etc) upon primary relocation AND don't allow the old primary to operate as primary anymore (see
IndexShard#verifyPrimary
) (@jasontedor, Introduce primary context #25122)
Replica recovery (no rollback)
A best effort doc based replica recovery, based on local last commit. By best effort we refer to having no guarantees on the primary
translog state and the likelihood of doc based recovery to succeed and not requiring a file sync
- Move local checkpoint to max seq# in commit when opening engine Tighten sequence numbers recovery #22212
We currently have no guarantee that all ops above the local checkpoint baked into the commit will be replayed. That means that delete operations with a seq# > local checkpoint will not be replayed. To work around it (for now), we will move the local checkpoint artificially (at the potential expense of correctness) (@jasontedor) -
Review correctness of POC and extract requirements for the primary side (@jasontedor)replaced with TLA+ work - Use seq# checkpoints for replica recovery (Introduce sequence-number-based recovery #22484 , @jasontedor)
Translog seq# based API
Currently translog keeps all operations that are not persisted into the last lucene commit. This doesn't imply that it can serve all operations from a given seq# and up. We want to move seq# based recovery where a lucene commit indicates what seq# a fully baked into it and the translog recovers from there.
- Add min/max seq# to translog generations. (Introduce sequence-number-aware translog #22822, @jasontedor)
- Add a smaller maximum generation size and automatically create new generations. Note that the total translog size can still grow to 512MB. However, dividing this in smaller pieces will allow a lucene commit will be able to trim the translog, even though we may need some operations from the non-current generation (see next bullet point). (Introduce translog generation rolling #23606, @jasontedor)
- Move translog trimming to use lastCommittedLocalCheckpoint and use that for recovering operations when opening a lucene index. BWC aspects (a replica on new node operating without seq#) TBD. (Preserve multiple translog generations #24015, @jasontedor)
- Adapt snapshots to dedup sequence numbers by primary term (by reading in reverse) (Dedup translog operations by reading in reverse #27268, @dnhatn)
- Allow to trim all ops above a certain seq# with a term lower than X (Allow to trim all ops above a certain seq# with a term lower than X #30176 @vladimirdolzhenko)
Primary recovery (no rollback)
- After recovering from the translog, the primary should close any gaps in its history by indexing no ops into the missing seq# (@s1monw, Fill missing sequence IDs up to max sequence ID when recovering from store #24238)
- After recovering, a primary should update its knowledge of its own local checkpoint (@ywelsch, Introduce primary/replica mode for GlobalCheckPointTracker #25468)
Primary promotion
- Implement a clean transition between replica operations on the shard (with an older term) and operations with the new term.
- Replicas (triggered by indexing operations and live sync) (Block older operations on primary term transition #24779 @jasontedor)
- Primary (via a cluster state update) (Introduce clean transition on primary promotion #24925 @jasontedor)
- Should trigger a translog role over (so operations from the new primary will always have a higher translog generation than any operation they may conflict with) (Guarantee that translog generations are seqNo conflict free #24825 @bleskes)
- Should close gaps in history. Note that this doesn't imply this information will be transferred to the replicas. That is the job of the primary/replica sync. (Fill gaps on primary promotion #24945 @jasontedor)
Live replica/primary sync (no rollback)
-
Add a task that streams all operations from the primary's global checkpoint to all shards. (@ywelsch Remove TranslogRecoveryPerformer #24858)
- fails the shard on other failures (double think about shard closing)
- triggers update of primary term on replica if needed
- transfers shard's local checkpoint back to the new primary
- progress reporting via the task manager infrastructure
-
When a replica shard increases its primary term under the mandate of a new primary, it should also update its global checkpoint; this gives us the guarantee that its global checkpoint is at least as high as the new primary and gives a starting point for the primary/replica resync (@ywelsch, Update global checkpoint when increasing primary term on replica #25422)
-
Replicas should throw their local checkpoint to the global checkpoint when detected a new primary. (@jasontedor, Throw back replica local checkpoint on new primary #25452)
Primary recovery with rollback
Needed to deal with discrepancies between translog and commit point that can result of failure during primary/replica sync
- Make sure the translog keeps data based on the oldest commit (Introducing a translog deletion policy #24950)
- A custom deletion policy to keep old commits around (@dnhatn, Keep commits and translog up to the global checkpoint #27606)
- Roll back before starting to recover from translog (lucene's deletion policy can clean commits before opening)(@dnhatn Rollback a primary before recovering from translog #27804)
- Clean up commits once they are no longer needed. Lucene only calls the deletion policy upon commit. Since we keep older commit around based on the advancement of the global checkpoint, we can trim data earlier, i.e., once the global checkpoint has advanced enough(@dnhatn Clean up commits when global checkpoint advanced #28140).
Replica recovery with rollback
Needed to throw away potential wrong doc versions that ended up in lucene. Those "wrong doc versions" may still be in the translog of the replica but since we ignore the translog on replica recovery they will be removed.
- Only use "valid" commit points as the basis for file based recovery (i.e., commit points below the global check point)(@dnhatn Primary send safe commit in file-based recovery #28038)
- Only use "valid" commit points as the basis for ops based recovery (lucene's deletion policy can clean commits before opening)(@dnhatn Replica starts peer recovery with safe commit #28181)
Live replica/primary sync with rollback
- Allow a shard to rollback to a seq# from before last known checkpoint# based on NRT readers
- Index all operations missing from the rollback point up to the global checkpoint from local translog
Seq# as versioning
- Change InternalEngine to to resolve collision based on seq# on replicas and recovery (@bleskes, Use sequence numbers to identify out of order delivery in replicas & recovery #24060)
- Change write API to allow specifying desired current seq# for the operation to succeed
- Make doc level versioning an opt in feature (mostly for external versioning)
Shrunk indices
Shrunk indices have mixed histories.
- The primary term of the new index should be higher than the source. (@jasontedor, Initialize primary term for shrunk indices #25307)
- When recovering the primary shard we need to set it's maximum seqNo to the max of all the source shards. (@jasontedor, Initialize sequence numbers on a shrunken index #25321)
- (PS - while we're at it we can also transfer the max
max_unsafe_auto_id_timestamp
@jasontedor, Initialize max unsafe auto ID timestamp on shrink #25356)
Adopt Me
- Properly store seq# in lucene: we expect to use the seq# for sorting, during collision checking and for doing range searches. The primary term will only be used during collision checking when the seq# of the two document copies is identical. Mapping this need to lucene means that the seq# it self should be stored both as a numeric doc value and as numerical indexed field (BKD). The primary term should be stored as a doc value field and doesn't need an indexed variant. We also considered the alternative of encoding both term and seq# into a single numeric/binary field as it may save on a the disk lookup implied by two separate fields. Since we expect the primary term to be rarely retrieved, we opted for the simplicity of the two doc value fields solution. We also expect it to mean better compression. (@dakrone) Add internal _primary_term field to store shard's primary term #21480
- Add primary term to DocWriteResponse (@jasontedor) Add primary term to doc write response #24171
- how do we deal with the BWC aspects in the case that - a primary is running on a new node will one replica is on an old node and one replica is on a new one. In that case the primary will maintain seq# and checkpoints for itself and the replica on the new node. However if the primary fails it may be the case that the old replica is elected as primary. That means that the other replica will suddenly stop receiving sequence numbers. It is not clear if this really a problem and if so what the best approach to solve it. (@dakrone, Promote replica on the highest version node #25277)
-
Introduce new shard states to indicated an ongoing primary sync on promotion. See Live primary-replica resync (no rollback) #24841 (review). We now have an alternative plan for this - see Introduce promoting index shard state #28004 (comment) - Delay primary relocation if primary has not yet synced with replicas . See Live primary-replica resync (no rollback) #24841 (review)
TBD
- A primary that's allocated when in sync replicas aren't can advance the global checkpoint to a region that's unsafe - the primary doesn't about if it's local ops, which are above the global checkpoint actually exists on the other replicas (@ywelsch Introduce primary/replica mode for GlobalCheckPointTracker #25468).
- File based recovery (both local and remote) can recreate deleted docs. If a delete is out of order with a previous index operation, we may replay the index operation on recovery, but not the delete. This has to do with the fact that we trim the translog/capture a starting point at an arbitrary generation, and replay all of it. Current solution - change the indexing plan on non-primary origin to never index to lucene below the local checkpoint (5.x is addressed in Engine - Do not store operations that are not index into lucene in the translog (5.x only) #25592)
- How to deal with failures to sync the global check point? this is interesting as we will rely on the checkpoint to be eventually updated on all shard copies.
- Throwing back local checkpoint to global checkpoint may leave us in a situation we don't have all ops in the translog to do primary/replica sync. This is because we don't have (yet) any guarantee that the translog has all ops above the global checkpoint. That shard will be problematic when promoted to primary (it doesn't have the data to do a sync), causing replica's local checkpoints to get stuck. This will change when we have a custom deletion policy but we may want to double check that and fail the primary if it can't sync it's replica.
- When indexing stops, sync global checkpoint faster (now we wait on idle shard) (@jasontedor, Introduce global checkpoint background sync #26591)
- Make local check point storage lazy intitialized to protect against memory usage during recovery (@jasontedor, Lazy initialize checkpoint tracker bit sets #27179)
- Snapshot and restore may create a primary that violates all our checkpointing logic. We should quantify the scenarios this can happen and discuss appropriate solutions. A similar issue occurs with force allocate empty & stale primaries.
- When indexing on a replica InternalEngine loads the primary term to resolve a potential conflict between two indexing operations to the same doc with the same seq#. Once we have proper rollbacks that should never be needed. Instead we should assert that the term is identical (and potentially that the doc it self is identical).
- Make replica shard allocation code aware of sequence-numbers, so that it selects a node with an existing shard copy not purely based on the number of matching Lucene segments, but also based on the number of operations that it's missing from the primary. Similarly, do the same to cancel existing recoveries if a node with an existing shard copy comes back that has better matching stats. (@dnhatn, Sequence number based replica allocation #46959)
Completed Miscellaneous
- Review feasibility of old indices (done and implemented in Add BWC layer to seq no infra and enable BWC tests #22185 ) (@bleskes)
- Remove Shadow replicas (@dakrone Remove shadow replicas #23906)
-
If minimum of all local checkpoints is less than global checkpoint on the primary, do we fail the shard?No, this can happen when replicas pull back their local checkpoints to their version of the global checkpoint -
Failed shards who's local checkpoint is lagging with more than 10000 (?) ops behind the primary . This is a temporary measure to allow merging into master without closing translog gaps during primary promotion on a live shard. Those will require the replicas to pick them up, which will take a replica/primary live sync