Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Breaking changes for CCR plugin in OS 2.0 #2482

Closed
saikaranam-amazon opened this issue Mar 16, 2022 · 45 comments
Closed

[BUG] Breaking changes for CCR plugin in OS 2.0 #2482

saikaranam-amazon opened this issue Mar 16, 2022 · 45 comments
Assignees
Labels
>breaking Identifies a breaking change. bug Something isn't working feedback needed Issue or PR needs feedback Plugins v2.0.0 Version 2.0.0

Comments

@saikaranam-amazon
Copy link
Member

saikaranam-amazon commented Mar 16, 2022

Describe the bug
CCR depends on IndexShard(getHistoryOperations) to retrieve translog operations for leader index.
In 2.0, this method is removed in this PR: #2077

 public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo, long endSeqNo)

Expected behavior

  • Please provide replacement for this method in 2.0 to retrieve operations based on history-source.
@saikaranam-amazon saikaranam-amazon added bug Something isn't working untriaged labels Mar 16, 2022
@dblock
Copy link
Member

dblock commented Mar 16, 2022

@saratvemulapalli What was this deprecated for?

@saratvemulapalli
Copy link
Member

@saikaranam-amazon the change was intentional because soft deletes are not optional anymore and history doesnt exist in translog instead has to be fetched from lucene index.

Could you try exploring the option of using:

public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, long endSeqNo) throws IOException {

Please see the conversation history on: #2077 (comment)

Give this a shot and let us know how it goes. I'll keep the issue open

@saratvemulapalli saratvemulapalli added >breaking Identifies a breaking change. v2.0.0 Version 2.0.0 Plugins and removed untriaged labels Mar 17, 2022
@saikaranam-amazon
Copy link
Member Author

saikaranam-amazon commented Mar 18, 2022

@saratvemulapalli Please see this issue for context: #1100
We used this method to fetch the ops from Tlog as source and we can't fetch from the lucene index (due to reasons in the above issue).

Please provide the option to fetch the translog ops based on history source.

@saikaranam-amazon
Copy link
Member Author

are not optional anymore and history doesnt exist in translog instead has to be fetched from lucene index.

Yes, previously as well that used to be the case. We have modified to hold the translog. This method is not alternative to the previous one as they don't give the same performance.

@saikaranam-amazon
Copy link
Member Author

Gentle reminder! - Any update?

@dblock
Copy link
Member

dblock commented Mar 21, 2022

@saikaranam-amazon Does the option suggested above by @saratvemulapalli not work?

@saikaranam-amazon
Copy link
Member Author

No @dblock , we will need the previous method due to the above reason.

@dblock
Copy link
Member

dblock commented Mar 21, 2022

@saikaranam-amazon What do you propose?

@andrross @kartg Could I please get you two to pitch in here, what should we do?

@saikaranam-amazon
Copy link
Member Author

Please add the previous method back to support retrieval based on the source. - Peer recovery can still use the new method.

@saratvemulapalli
Copy link
Member

saratvemulapalli commented Mar 22, 2022

@saikaranam-amazon from what I understand looks like we are not writing to translog anymore.
Adding back the method would not help. Again this is with my limited knowledge.

From what you are saying its definitely an impact in performance when using lucene indices. But it feels like thats the only way.

@nknize is our understanding correct, what would you suggest?

@saratvemulapalli
Copy link
Member

@saikaranam-amazon also in terms of the numbers, do you happen to know how bad it would be to read from lucene index vs reading from translog?

@saikaranam-amazon
Copy link
Member Author

@saratvemulapalli Yes, captured in this issue: #1100

Problem
Retention leases preserves operations for each shard at Lucene level (used as part of peer recovery within the cluster).
During performance benchmarking for the replication feature (for high indexing workloads), the fetch for the latest operations from the leader cluster has seen an impact on CPU (of up to ~8-10%) due to Lucene stored fields decompression.

@nknize
Copy link
Collaborator

nknize commented Mar 30, 2022

due to Lucene stored fields decompression.

What version of Lucene did you benchmark? /cc @andrross

For clarification, operations are still available and can be replayed. It's just stored in a Lucene index instead of a Translog file. Hence "HistorySource" being removed as an option.

@nknize
Copy link
Collaborator

nknize commented Mar 30, 2022

I just checked the commit. You were benchmarking with Lucene 8.9 which had a performance regression bug for Stored fields decompression that was fixed in Lucene 8.10

I suggest updating CCR to use the Lucene index source changes and rerun benchmarks.

@nknize nknize added the feedback needed Issue or PR needs feedback label Mar 31, 2022
@saikaranam-amazon
Copy link
Member Author

Ack. Let me check and run with 8.10

@saikaranam-amazon
Copy link
Member Author

saikaranam-amazon commented Apr 7, 2022

For 2.0 core: Ran with latest 2.0.0 (alpha) artifacts(April 6th 2022):
For 1.3 core: Picked the latest release artifacts

Instance: i3.xlarge
Esrally dataset: noaa (Global daily weather measurements from NOAA )

Test setup:

  • In order to log only replication specific operations
    • Ensured single batch can be made during the following phase
    • CPU and total time are captured.

Seeing a regression of ~10%(avg) diff and
difference of total time > ~15min to fetch the same amount of operations in 2.0 code path

Reference(Consider avg. CPU and total time for the fetch operations from the leader index):
2.0.0
Screenshot 2022-04-07 at 1 36 16 PM

1.3.1
Screenshot 2022-04-07 at 1 39 07 PM

cc: @nknize

@nknize
Copy link
Collaborator

nknize commented Apr 7, 2022

Esrally dataset: noaa (Global daily weather measurements from NOAA )

are you using the OpenSearch default codec or are you relying on rally to set the codec for you?

@saikaranam-amazon
Copy link
Member Author

No, ESRally didn't set the codec.

The index is created with the default codec.

@nknize
Copy link
Collaborator

nknize commented Apr 7, 2022

The index is created with the default codec.

Can you link to the line where you're setting the codec?

Did you run with any other tracks beside the NOAA set?

@saikaranam-amazon
Copy link
Member Author

saikaranam-amazon commented Apr 7, 2022

No, I didn't set any codec for the index. It is the default codec that every index picks on new index creation in OpenSearch.

@nknize
Copy link
Collaborator

nknize commented Apr 7, 2022

Just a couple thoughts here.

  1. Let's explicitly set "index.codec" : "DEFAULT" just to rule out any possibilities of not using the BEST_SPEED codec.
  2. We need to get a bigger picture of the benchmarks. Let's get comparison of disk usage when using translog vs lucene index as history source to show the trade of disk storage cost vs cpu performance.
  3. Let's run with a few different data sets to see if the regression is consistent across the board and not a red herring to NOAA. Can you start w/ the event data from the original post. Then try nyc_taxi, so, noaa, nested?.
  4. Can you include the benchmark table like included in the original issue?

@saikaranam-amazon
Copy link
Member Author

saikaranam-amazon commented Apr 12, 2022

LuceneChangesSnapshot already retrieves the ops sorted by sequence number. Why all the transient ArrayLists and extra logic? Seems unnecessary.

This was used for the Tlog logic and not for the new method from the OS core.
Updated the method to remove any inconsistencies before running the tests.

Just touching base on this issue. The best approach for the project at this juncture is to leave as is; especially since 2.0 preview is right around the corner.

I would like to add another option to add the methods back to the core package for 2.0 and have the path for deprecation in later 2.x release.

Regarding the previous runs and various datasets used:

  • Index was set with default codec which is tuned for best speed
  • Attaching the results for the noaa with esrally metrics. we have other datasets that are running in parallel and will update the issue, as and when the results are available.
  • Another datapoint is, we already have event data(from previous run in the linked issue) and NOAA(in the current issue) exhibiting similar behavior.
    • In both of the cases, it is tuned for best_speed and not for compression(deflate codec).

Setup is same as before

  • 1 Primary 0 replicas
  • Index setting for codec set to default
  • Started the replication on the indices and caputed the esrally metrics below
  • 2.0

------------------------------------------------------
    _______             __   _____
   / ____(_)___  ____ _/ /  / ___/_________  ________
  / /_  / / __ \/ __ `/ /   \__ \/ ___/ __ \/ ___/ _ \
 / __/ / / / / / /_/ / /   ___/ / /__/ /_/ / /  /  __/
/_/   /_/_/ /_/\__,_/_/   /____/\___/\____/_/   \___/
------------------------------------------------------
            
|                                                         Metric |   Task |    Value |   Unit |
|---------------------------------------------------------------:|-------:|---------:|-------:|
|                     Cumulative indexing time of primary shards |        |  89.2113 |    min |
|             Min cumulative indexing time across primary shards |        |        0 |    min |
|          Median cumulative indexing time across primary shards |        |  44.6057 |    min |
|             Max cumulative indexing time across primary shards |        |  89.2113 |    min |
|            Cumulative indexing throttle time of primary shards |        |        0 |    min |
|    Min cumulative indexing throttle time across primary shards |        |        0 |    min |
| Median cumulative indexing throttle time across primary shards |        |        0 |    min |
|    Max cumulative indexing throttle time across primary shards |        |        0 |    min |
|                        Cumulative merge time of primary shards |        |  18.1808 |    min |
|                       Cumulative merge count of primary shards |        |       28 |        |
|                Min cumulative merge time across primary shards |        |        0 |    min |
|             Median cumulative merge time across primary shards |        |   9.0904 |    min |
|                Max cumulative merge time across primary shards |        |  18.1808 |    min |
|               Cumulative merge throttle time of primary shards |        |  7.59582 |    min |
|       Min cumulative merge throttle time across primary shards |        |        0 |    min |
|    Median cumulative merge throttle time across primary shards |        |  3.79791 |    min |
|       Max cumulative merge throttle time across primary shards |        |  7.59582 |    min |
|                      Cumulative refresh time of primary shards |        |  1.24653 |    min |
|                     Cumulative refresh count of primary shards |        |       54 |        |
|              Min cumulative refresh time across primary shards |        |        0 |    min |
|           Median cumulative refresh time across primary shards |        | 0.623267 |    min |
|              Max cumulative refresh time across primary shards |        |  1.24653 |    min |
|                        Cumulative flush time of primary shards |        |  4.16113 |    min |
|                       Cumulative flush count of primary shards |        |       26 |        |
|                Min cumulative flush time across primary shards |        |        0 |    min |
|             Median cumulative flush time across primary shards |        |  2.08057 |    min |
|                Max cumulative flush time across primary shards |        |  4.16113 |    min |
|                                             Total Young Gen GC |        |  194.223 |      s |
|                                               Total Old Gen GC |        |    0.305 |      s |
|                                                     Store size |        |  7.29137 |     GB |
|                                                  Translog size |        | 0.524542 |     GB |
|                                         Heap used for segments |        |        0 |     MB |
|                                       Heap used for doc values |        |        0 |     MB |
|                                            Heap used for terms |        |        0 |     MB |
|                                            Heap used for norms |        |        0 |     MB |
|                                           Heap used for points |        |        0 |     MB |
|                                    Heap used for stored fields |        |        0 |     MB |
|                                                  Segment count |        |       72 |        |
|                                                 Min Throughput |  index |    23000 | docs/s |
|                                              Median Throughput |  index |  23436.3 | docs/s |
|                                                 Max Throughput |  index |  24577.6 | docs/s |
|                                        50th percentile latency |  index |  1599.55 |     ms |
|                                        90th percentile latency |  index |  1986.58 |     ms |
|                                        99th percentile latency |  index |  4399.13 |     ms |
|                                      99.9th percentile latency |  index |    10353 |     ms |
|                                       100th percentile latency |  index |  13417.4 |     ms |
|                                   50th percentile service time |  index |  1599.55 |     ms |
|                                   90th percentile service time |  index |  1986.58 |     ms |
|                                   99th percentile service time |  index |  4399.13 |     ms |
|                                 99.9th percentile service time |  index |    10353 |     ms |
|                                  100th percentile service time |  index |  13417.4 |     ms |
|                                                     error rate |  index |        0 |      % |


----------------------------------
[INFO] SUCCESS (took 1473 seconds)
----------------------------------

  • 1.3
------------------------------------------------------
    _______             __   _____
   / ____(_)___  ____ _/ /  / ___/_________  ________
  / /_  / / __ \/ __ `/ /   \__ \/ ___/ __ \/ ___/ _ \
 / __/ / / / / / /_/ / /   ___/ / /__/ /_/ / /  /  __/
/_/   /_/_/ /_/\__,_/_/   /____/\___/\____/_/   \___/
------------------------------------------------------
            
|                                                         Metric |   Task |      Value |   Unit |
|---------------------------------------------------------------:|-------:|-----------:|-------:|
|                     Cumulative indexing time of primary shards |        |    75.3275 |    min |
|             Min cumulative indexing time across primary shards |        |          0 |    min |
|          Median cumulative indexing time across primary shards |        |    37.6638 |    min |
|             Max cumulative indexing time across primary shards |        |    75.3275 |    min |
|            Cumulative indexing throttle time of primary shards |        |          0 |    min |
|    Min cumulative indexing throttle time across primary shards |        |          0 |    min |
| Median cumulative indexing throttle time across primary shards |        |          0 |    min |
|    Max cumulative indexing throttle time across primary shards |        |          0 |    min |
|                        Cumulative merge time of primary shards |        |    17.3825 |    min |
|                       Cumulative merge count of primary shards |        |         26 |        |
|                Min cumulative merge time across primary shards |        |          0 |    min |
|             Median cumulative merge time across primary shards |        |    8.69126 |    min |
|                Max cumulative merge time across primary shards |        |    17.3825 |    min |
|               Cumulative merge throttle time of primary shards |        |    6.83275 |    min |
|       Min cumulative merge throttle time across primary shards |        |          0 |    min |
|    Median cumulative merge throttle time across primary shards |        |    3.41637 |    min |
|       Max cumulative merge throttle time across primary shards |        |    6.83275 |    min |
|                      Cumulative refresh time of primary shards |        |   0.800183 |    min |
|                     Cumulative refresh count of primary shards |        |         46 |        |
|              Min cumulative refresh time across primary shards |        |          0 |    min |
|           Median cumulative refresh time across primary shards |        |   0.400092 |    min |
|              Max cumulative refresh time across primary shards |        |   0.800183 |    min |
|                        Cumulative flush time of primary shards |        |    4.12702 |    min |
|                       Cumulative flush count of primary shards |        |         27 |        |
|                Min cumulative flush time across primary shards |        |          0 |    min |
|             Median cumulative flush time across primary shards |        |    2.06351 |    min |
|                Max cumulative flush time across primary shards |        |    4.12702 |    min |
|                                             Total Young Gen GC |        |    149.146 |      s |
|                                               Total Old Gen GC |        |      0.224 |      s |
|                                                     Store size |        |    6.72589 |     GB |
|                                                  Translog size |        |   0.522197 |     GB |
|                                         Heap used for segments |        |    1.21087 |     MB |
|                                       Heap used for doc values |        |   0.450932 |     MB |
|                                            Heap used for terms |        |   0.718445 |     MB |
|                                            Heap used for norms |        | 0.00012207 |     MB |
|                                           Heap used for points |        |          0 |     MB |
|                                    Heap used for stored fields |        |  0.0413742 |     MB |
|                                                  Segment count |        |         67 |        |
|                                                 Min Throughput |  index |    21333.4 | docs/s |
|                                              Median Throughput |  index |    26861.7 | docs/s |
|                                                 Max Throughput |  index |    27170.9 | docs/s |
|                                        50th percentile latency |  index |    1301.35 |     ms |
|                                        90th percentile latency |  index |    1727.66 |     ms |
|                                        99th percentile latency |  index |    4972.26 |     ms |
|                                      99.9th percentile latency |  index |    9734.46 |     ms |
|                                       100th percentile latency |  index |    10881.1 |     ms |
|                                   50th percentile service time |  index |    1301.35 |     ms |
|                                   90th percentile service time |  index |    1727.66 |     ms |
|                                   99th percentile service time |  index |    4972.26 |     ms |
|                                 99.9th percentile service time |  index |    9734.46 |     ms |
|                                  100th percentile service time |  index |    10881.1 |     ms |
|                                                     error rate |  index |          0 |      % |


----------------------------------
[INFO] SUCCESS (took 1263 seconds)
----------------------------------

@nknize
Copy link
Collaborator

nknize commented Apr 12, 2022

Thank you for posting this @saikaranam-amazon

A few quick questions:

  1. Did you index the same number of documents in both runs?
  2. For the event data benchmark did you explicitly set the codec or leave it alone per the run?

@nknize
Copy link
Collaborator

nknize commented Apr 12, 2022

add the methods back to the core package for 2.0 and have the path for deprecation in later 2.x release.

Since public preview is right around the corner and CCR will need to migrate to segrep for 3.0+ anyway, how about a middle ground for 2.0+:

  1. Open the PR to add back deprecated TLog methods for 2.0 release only
  2. Refactor CCR as a core module/plugin for 2.1+ and refactor deprecated methods to the CCR module/plugin
  3. Dig into benchmarks to ensure validity
    a. based on benchmark findings either i. keep deprecated methods in the new CCR core module/plugin for all of 2.x or ii. remove altogether in favor of LuceneChangesSnapshot.
  4. Migrate CCR to segrep in 3.0+

Feel free to thumb up #2872 if agree

@mikemccand
Copy link

Sorry, I am new to CCR implementation, but my biggest question is why would we rely on replaying indexing operations from xlog to achieve CCR? Shouldn't we rather replicate delta (changed since last CCR sync) Lucene's segments, which already did the costly work of indexing all docs?

@saikaranam-amazon
Copy link
Member Author

Open the PR to add back deprecated TLog methods for 2.0 release only

Thanks @nknize . Published PR - #2886

@dblock
Copy link
Member

dblock commented Apr 13, 2022

@saikaranam-amazon please also confirm that you agree with the entire plan above, especially #2872 (#2886 (comment)). If we don't do this I'd rather not reintroduce the deprecated methods and ship with the regression.

@CEHENKLE
Copy link
Member

Concur with dB. We do not want to add technical debt with out a plan to remove it.

@Pallavi-AWS
Copy link
Member

Pallavi-AWS commented Apr 14, 2022

@dblock @nknize Thanks for the interim workaround of #2886. We will have a plan in place to move away from deprecated method in OS 2.1 (refactoring core and plugin will need some design work). However, for merging CCR into core, we can explore options for OS 3.0 (java migration, segrep adoption etc.) - we might have to settle on a hybrid model where we continue to have functionality like security in the replication plugin and move replication engine to core. We also need to keep longer term items such as active-active replication in mind while rearchitecting.

@krishna-ggk
Copy link

krishna-ggk commented Apr 14, 2022

Sorry, I am new to CCR implementation, but my biggest question is why would we rely on replaying indexing operations from xlog to achieve CCR? Shouldn't we rather replicate delta (changed since last CCR sync) Lucene's segments, which already did the costly work of indexing all docs?

Thanks for the feedback Mike. There were a bunch of considerations for CCR to rely on translog at the time.

  1. Ability to filter out documents to be replicated
  2. Reliance on refreshes which may have implications to some usecases
  3. Since segments on primary and replica are not homogeneous, replica promotions due to node losses etc will need special handling with more complexity.
  4. Further due to same reason as 3, only a designated shard (primary/replica) can replicate segments - which means segment transfer cannot be load balanced if required.
  5. Having different replication models for same cluster replicas vs different cluster (at the time) for the same index implies inheriting cons from both models.

That said, we totally realize the immense gains we can get with segment replication (thanks to your blogs as well as other success stories.) With segment replication making its way into OpenSearch for same-cluster replicas, it is a good time to visit CCR based on segrep.

@nknize
Copy link
Collaborator

nknize commented Apr 14, 2022

for merging CCR into core, we can explore options for OS 3.0...

We need to do this in 2.x instead of opening more core extension points to override mechansims we shouldn't be overriding; we've already be burnt by that once when KNN and CCR were incompatible due to both overridding the Engine.

we might have to settle on a hybrid model where we continue to have functionality like security in the replication plugin and move replication engine to core.

As a first step we can refactor CCR to core and secure the endpoints separately in the security module. This fits with the design of core in 1.x/2.x and the secure endpoints will refactor to core once security becomes a first class citizen.

it is a good time to visit CCR based on segrep.

💯 We can implement in a later phase but CCR should be baked into the design consideration of segrep from the beginning to minimize the early tech debt surface area /cc @mch2 @kartg @anasalkouz @mikemccand

@nknize
Copy link
Collaborator

nknize commented Apr 14, 2022

Ability to filter out documents to be replicated

Just dropping some thoughts out loud on this one:

This sounds like a good use-case fit for the streaming index api (future home for the xlog and user defined durability policies). One idea is to have CCR filters in the streaming index api route copies of indexing requests to the follower cluster if filter is matched. Another idea, there could be co-located IndexWriters in the streaming index api of the leader cluster that filter the documents and flush follower segments to be replicated by segrep of the follower cluster only.

In the former case you're using a form of docrep for inter-cluster replication but paying the penalty for additional network traffic (and possible failures). In the latter you're parallelizing the writes on the leader node and using segrep for inter and intra-cluster replication. Incidentally this also provides the "fail early" protection we have today by indexing in memory and only replicating when flushing.

@dblock
Copy link
Member

dblock commented Apr 19, 2022

@saikaranam-amazon Care to add a summary with links to various PRs and close?

@mikemccand
Copy link

Sorry, I am new to CCR implementation, but my biggest question is why would we rely on replaying indexing operations from xlog to achieve CCR? Shouldn't we rather replicate delta (changed since last CCR sync) Lucene's segments, which already did the costly work of indexing all docs?

Thanks for the feedback Mike. There were a bunch of considerations for CCR to rely on translog at the time.

+1, thanks for the details. More replies below:

  1. Ability to filter out documents to be replicated

Lucene has a powerful FilterLeafReader that works really well for this. You can far more efficiently filter on the post-indexed form, segment by segment, instead of the raw original input source docs.

  1. Reliance on refreshes which may have implications to some usecases

That's a good point -- if you somehow need CCR more frequently than refreshes than replicating segments might be an issue. But does this really happen in practice?

  1. Since segments on primary and replica are not homogeneous, replica promotions due to node losses etc will need special handling with more complexity.

Actually this is a big benefit of NRT segment replication -- the primary and replica ARE homogeneous. If replica is promoted, due to either primary node down, or simple load balancing freedom in the cluster (master can pick a different replica that has colder CPU than the hot primary to protect overall cluster throughput), it's the exact same segments. This is the same reason why scan/scroll can also be safely routed to any primary or its replicas with no loss of accuracy.

  1. Further due to same reason as 3, only a designated shard (primary/replica) can replicate segments - which means segment transfer cannot be load balanced if required.

NRT segment replication fixes that too -- it can load balance nearly arbitrarily across primary + its replicas on every "pull next bytes" request. Just needs the right lease logic to keep the point-in-time snapshots lit.

  1. Having different replication models for same cluster replicas vs different cluster (at the time) for the same index implies inheriting cons from both models.

I would rather we deprecate the inefficient and fragile "reindex all docs" approach and encourage users to switch to NRT segment replication. I agree keeping both options long-term is a bad choice.

That said, we totally realize the immense gains we can get with segment replication (thanks to your blogs as well as other success stories.) With segment replication making its way into OpenSearch for same-cluster replicas, it is a good time to visit CCR based on segrep.

Thanks. I've long felt (decade at least) that it was crazy that ES chose document replication. Lucene is naturally write-once at the file level and that makes for an obvious replication unit. And it yields awesome transactional semantics to users, point-in-time searching and efficient incremental backups, like ZFS's choice of write-once block level storage yielding efficient whole-filesystem snapshots to users.

@nknize
Copy link
Collaborator

nknize commented Apr 19, 2022

Lucene has a powerful FilterLeafReader that works really well for this.

This is true! I wonder then, w/ segment replication is there even a need for CCR? Couldn't we just snapshot in the leader cluster and restore in the follower and use FLR on the follower to filter unneeded docs?

Is there a use case I'm missing where CCR is needed at all if we're using a segment level replication design? Seems more appropriate for document level.

@dblock
Copy link
Member

dblock commented Apr 19, 2022

@Pallavi-AWS ^

@mikemccand
Copy link

Lucene has a powerful FilterLeafReader that works really well for this.

This is true! I wonder then, w/ segment replication is there even a need for CCR? Couldn't we just snapshot in the leader cluster and restore in the follower and use FLR on the follower to filter unneeded docs?

Is there a use case I'm missing where CCR is needed at all if we're using a segment level replication design? Seems more appropriate for document level.

+1 to explore this.

Let the cloud storage layer (S3 or whatever) handle the geo-distribution of the bits. Snapshot in one geo and restore in the other.

@krishna-ggk
Copy link

krishna-ggk commented Apr 20, 2022

it is a good time to visit CCR based on segrep.

💯 We can implement in a later phase but CCR should be baked into the design consideration of segrep from the beginning to minimize the early tech debt surface area /cc @mch2 @kartg @anasalkouz @mikemccand

@nknize - +1 - opened a dedicated github issue to explore this in more detail.


@mikemccand - Thanks, let me address the points below.

  1. Ability to filter out documents to be replicated

Lucene has a powerful FilterLeafReader that works really well for this. You can far more efficiently filter on the post-indexed form, segment by segment, instead of the raw original input source docs.

True, definitely worth considering. One of the reason for considering filtering at source while replicating was to provide a security posture where filtered out documents don't move out of cluster. However given that current CCR doesn't implement filtering yet, we could consider seeking input from community to see if there is real need for this usecase in the RFC.

  1. Reliance on refreshes which may have implications to some usecases

That's a good point -- if you somehow need CCR more frequently than refreshes than replicating segments might be an issue. But does this really happen in practice?

Since CCR targets Disaster Recovery usecases, the thought was to ensure ack'd writes are replicated as soon as possible.

  1. Since segments on primary and replica are not homogeneous, replica promotions due to node losses etc will need special handling with more complexity.

Actually this is a big benefit of NRT segment replication -- the primary and replica ARE homogeneous. If replica is promoted, due to either primary node down, or simple load balancing freedom in the cluster (master can pick a different replica that has colder CPU than the hot primary to protect overall cluster throughput), it's the exact same segments. This is the same reason why scan/scroll can also be safely routed to any primary or its replicas with no loss of accuracy.

  1. Further due to same reason as 3, only a designated shard (primary/replica) can replicate segments - which means segment transfer cannot be load balanced if required.

NRT segment replication fixes that too -- it can load balance nearly arbitrarily across primary + its replicas on every "pull next bytes" request. Just needs the right lease logic to keep the point-in-time snapshots lit.

Yes agree. However since we didn't have segment replication implementation for same-cluster replica at the time, we were limited by the cons of non-homogeneous segments.

  1. Having different replication models for same cluster replicas vs different cluster (at the time) for the same index implies inheriting cons from both models.

I would rather we deprecate the inefficient and fragile "reindex all docs" approach and encourage users to switch to NRT segment replication. I agree keeping both options long-term is a bad choice.

+1

That said, we totally realize the immense gains we can get with segment replication (thanks to your blogs as well as other success stories.) With segment replication making its way into OpenSearch for same-cluster replicas, it is a good time to visit CCR based on segrep.

Thanks. I've long felt (decade at least) that it was crazy that ES chose document replication. Lucene is naturally write-once at the file level and that makes for an obvious replication unit. And it yields awesome transactional semantics to users, point-in-time searching and efficient incremental backups, like ZFS's choice of write-once block level storage yielding efficient whole-filesystem snapshots to users.

Yes. The gains demonstrated in segment replication looks great.

@Pallavi-AWS
Copy link
Member

Pallavi-AWS commented Apr 20, 2022

hi team, as this issue was tracking 2.0 CCR support (which is resolved), can we go ahead and close this issue? We will continue to discuss core migration under #2872 and segrep under #3020. @saikaranam-amazon can we please open one more issue on removing dependency on getHistory in 2.1? Thanks all.

@krishna-ggk
Copy link

Ability to filter out documents to be replicated

Just dropping some thoughts out loud on this one:

This sounds like a good use-case fit for the streaming index api (future home for the xlog and user defined durability policies). One idea is to have CCR filters in the streaming index api route copies of indexing requests to the follower cluster if filter is matched. Another idea, there could be co-located IndexWriters in the streaming index api of the leader cluster that filter the documents and flush follower segments to be replicated by segrep of the follower cluster only.

In the former case you're using a form of docrep for inter-cluster replication but paying the penalty for additional network traffic (and possible failures). In the latter you're parallelizing the writes on the leader node and using segrep for inter and intra-cluster replication. Incidentally this also provides the "fail early" protection we have today by indexing in memory and only replicating when flushing.

Lucene has a powerful FilterLeafReader that works really well for this.

This is true! I wonder then, w/ segment replication is there even a need for CCR? Couldn't we just snapshot in the leader cluster and restore in the follower and use FLR on the follower to filter unneeded docs?
Is there a use case I'm missing where CCR is needed at all if we're using a segment level replication design? Seems more appropriate for document level.

+1 to explore this.

Let the cloud storage layer (S3 or whatever) handle the geo-distribution of the bits. Snapshot in one geo and restore in the other.

@nknize @mikemccand Thanks. Nice discussion and thoughts - continuing the discussion further in 373.

@saikaranam-amazon
Copy link
Member Author

saikaranam-amazon commented Apr 20, 2022

A few quick questions:
Did you index the same number of documents in both runs?
For the event data benchmark did you explicitly set the codec or leave it alone per the run?

Yes, we do have same documents across the runs. And we did set the codec that is optimized for speed in both runs.
We have already a task this is being worked-on and is in the review phase to automate the performance runs. This should address all the concerns for the future runs.

Refactor CCR as a core module/plugin for 2.1+ and refactor deprecated methods to the CCR module/plugin

Sure. Opened issue under CCR to track this: opensearch-project/cross-cluster-replication#375

Dig into benchmarks to ensure validity
a. based on benchmark findings either i. keep deprecated methods in the new CCR core module/plugin for all of 2.x or ii. remove altogether in favor of LuceneChangesSnapshot.

Understood and based on previous runs and subsequent runs performed (for this issue - #2482), we do see performance impact and already opened issue to track the work for providing extension points (opensearch-project/cross-cluster-replication#375)

Migrate CCR to segrep in 3.0+

The analysis and the next steps will be captured in #3020 and discussion can be continued in that issue.

@saikaranam-amazon
Copy link
Member Author

saikaranam-amazon commented Apr 20, 2022

for merging CCR into core, we can explore options for OS 3.0...

We need to do this in 2.x instead of opening more core extension points to override mechansims we shouldn't be overriding; we've already be burnt by that once when KNN and CCR were incompatible due to both overridding the Engine.

we might have to settle on a hybrid model where we continue to have functionality like security in the replication plugin and move replication engine to core.

As a first step we can refactor CCR to core and secure the endpoints separately in the security module. This fits with the design of core in 1.x/2.x and the secure endpoints will refactor to core once security becomes a first class citizen.

Agree with eventual state for the CCR and we will address the plugin migration(to core as module) and segment replication in the up-coming releases, as migration to core will require lot of effort(kotlin/java migration, security module etc) and planning based on the already scheduled releases for 2.x.

we will continue the discussion to move the plugin as module under core in #2872 and Segment replication under CCR under: #3020
For 2.x, we will explore the option of extension points under opensearch-project/cross-cluster-replication#375

@saikaranam-amazon
Copy link
Member Author

saikaranam-amazon commented Apr 20, 2022

@saikaranam-amazon Care to add a summary with links to various PRs and close?

For 2.0, we have added the deprecated translog operations under the engine and PR(#2886) is merged for 2.0. Based on the further discussion, we have following issues for different tracks:

Please continue the relevant discussions in the above issues.

Closing this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>breaking Identifies a breaking change. bug Something isn't working feedback needed Issue or PR needs feedback Plugins v2.0.0 Version 2.0.0
Projects
None yet
Development

No branches or pull requests

8 participants