Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use sequential stored field reader in LuceneSyntheticSourceChangesSnapshot #121636

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

martijnvg
Copy link
Member

@martijnvg martijnvg commented Feb 4, 2025

Without a change like this, synthetic recovery source is ~2 times slower compared to stored recovery source. See this dashboard that shows total read time in follower cluster broken down by shard: https://esbench-metrics.kb.us-east-2.aws.elastic-cloud.com:9243/app/r/s/pFChT

Baseline is stored recovery source and contender is synthetic recovery source. The benchmark is performed using a new elastic/logs challenge (elastic/rally-tracks#734), that configures auto following of logs-* into the local cluster.

After investigation, the diff between operations read between baseline and contender was caused by decompressing stored fields over and over again:

image

(this happened, because for each operation, a stored field block gets de-compressed, but only stored fields for one document is read. The next op is very likely to de-compress the same stored field block)

The same benchmark with this change:
https://esbench-metrics.kb.us-east-2.aws.elastic-cloud.com:9243/app/r/s/kVgM2

With the change synthetic recovery source is more than 2 times faster compared to synthetic recovery source. This matches with what we observed in earlier ad-hoc / micro benchmarks.

Labelling this as a non-issue. This is a performance bug, but synthetic recovery source hasn't been released yet.

@martijnvg martijnvg added :Distributed Indexing/Recovery Anything around constructing a new shard, either from a local or a remote source. :StorageEngine/Logs You know, for Logs labels Feb 4, 2025
this.storedFieldLoader = StoredFieldLoader.create(false, storedFields);
String codec = EngineConfig.INDEX_CODEC_SETTING.get(mapperService.getIndexSettings().getSettings());
boolean shouldForceSequentialReader = CodecService.BEST_COMPRESSION_CODEC.equals(codec);
this.storedFieldLoader = StoredFieldLoader.create(false, storedFields, shouldForceSequentialReader);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should use always force a sequential reader. I think no matter the stored field format, it has benefits given how LuceneSyntheticSourceChangesSnapshot access stored fields.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not entirely sure. Does this not depend upon how jumpy the data is, which with index sorting could be very jumpy - and whether there is more than one stored field per doc (not sure that is always true?)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this is why then changed to logic not look at index.codec setting anymore.

@martijnvg martijnvg marked this pull request as ready for review February 4, 2025 09:53
@martijnvg martijnvg requested review from jimczi, dnhatn and tlrx February 4, 2025 09:53
@elasticsearchmachine elasticsearchmachine added Team:StorageEngine Team:Distributed Indexing Meta label for Distributed Indexing team labels Feb 4, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-storage-engine (Team:StorageEngine)

@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing)

@tlrx
Copy link
Member

tlrx commented Feb 4, 2025

@martijnvg looks like both links point to the same benchmark?

@martijnvg
Copy link
Member Author

@tlrx Oops, I've update the link.

@martijnvg
Copy link
Member Author

(I updated the description with more information)

@jimczi
Copy link
Contributor

jimczi commented Feb 4, 2025

This feels a bit counterintuitive. Is it because the CCR buffer is big enough that reads end up being sequential? Also, is index sorting applied to the rally track? I’m trying to understand why this helps since we said index sorting would make sequential reads worse, yet this change applies it all the time.

@martijnvg
Copy link
Member Author

Also, is index sorting applied to the rally track?

Yes, this is based on the elastic/logs track which sets index.mode to logsdb (which means index sort fields are host.name and @timestamp).

I’m trying to understand why this helps since we said index sorting would make sequential reads worse, yet this change applies it all the time.

Reader is a misleading word in the title. This is just using a stored field reader implementation that decompresses an entire block at a time (This delegate to SequentialStoredFieldsLeafReader#getSequentialStoredFieldsReader(...) which then delegates to StoredFieldsReader#getMergeInstance()). In LuceneSyntheticSourceChangesSnapshot#transformScoreDocsToRecords(...) we always access stored fields in docid order (based on the requested seqno range). With this change, a stored field block is de-compressed eagerly and each subsequent stored field read can access without de-compressing (until next block). This is beneficial due how stored fields are accessed in LuceneSyntheticSourceChangesSnapshot. Also operations that haven't been replicated yet typically have higher timestamps (compared to already indexed logs), so it is likely to align with the index sorting.

@martijnvg martijnvg changed the title Use sequential reader in LuceneSyntheticSourceChangesSnapshot Use sequential stored field reader in LuceneSyntheticSourceChangesSnapshot Feb 4, 2025
@jimczi
Copy link
Contributor

jimczi commented Feb 4, 2025

My comment was more that it's beneficial for a specific layout, the block of doc ids to retrieve is dense but here we apply it all the time. Performance in this mode might degrade when adding more hosts or when more diverse ingestion happens?
Or when the buffer is smaller?
Can we get some statistics from this rally track? Something like average gap size between documents?
In non-synthetic mode we apply sequential reader only if the entire range is completely dense. We can be more granular here and apply the heuristic for each block of doc ids that we load?

@martijnvg
Copy link
Member Author

Performance in this mode might degrade when adding more hosts or when more diverse ingestion happens?

For the elastic/logs track, some data streams have 0 or 1 host, some have a few and some up to 50 hosts iirc.
The results from Rally's ccr telemetry device (which tracks follower stats by shard) suggest that all data streams benefit from this change.

Or when the buffer is smaller?

Right, if the buffer is small then maybe sequential stored field reader is the right choice.

Can we get some statistics from this rally track? Something like average gap size between documents?

You mean the docid gap between requested seqno? I will look into this.

In non-synthetic mode we apply sequential reader only if the entire range is completely dense. We can be more granular here and apply the heuristic for each block of doc ids that we load?

Yes, the non-synthetic implementation selectively uses sequential stored field reader based on the range of requested docid. Like you said we can do that here too and be a little bit less strict. We can in the LuceneSyntheticSourceChangesSnapshot#loadDocuments(...) method to utilize sequential stored field reader is requested docids are close to each other.

//
// A sequential reader decompresses a block eagerly, so that increasing adjacent doc ids can access stored fields without
// compressing on each StoredFields#document(docId) invocation. The only downside is the last few operations in the request
// seq_no range are at the beginning of a block, which means stored fields for many docs are being decompressed that isn't used.
Copy link
Contributor

@henningandersen henningandersen Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true also for zstd? Or rather would that always decompress the entire thing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it both true for zstd or deflate, since both de-compress an entire block.

@martijnvg
Copy link
Member Author

Can we get some statistics from this rally track? Something like average gap size between documents?

I'm printing average docid gap in LuceneSyntheticSourceChangesSnapshot#transformScoreDocsToRecords(...). Very often I see average docid gap of below 10 per ~1000 docs. Which explains the positive effect of this change.

@martijnvg
Copy link
Member Author

One aspect of investigating the performance difference between stored and synthetic recovery source that isn't included in this PR history, is that ccr replication is so much slower with zstd compressed stored fields compared to previous default for best compression (deflate) (see results). The deflate stored field decompressor can skip decompressing unneeded blocks, whereas with the zstd decompressor that isn't the case. Given that all docids are being visited with ccr replication, a lot of blocks are being repetitively de-compressed. This is much heavier with the current default stored field codec with logsdb (zstd). This PR tries to address this at the LuceneSyntheticSourceChangesSnapshot level. @dnhatn had an interesting alternative idea, that involves caching last de-compressed bytes via a weak reference that gets reused if index input and file pointer matches. I think his idea solves the problem being experienced with ccr replication at the right level and could be useful of other stored field use cases too. We're running benchmarks to validate this approach.

@martijnvg
Copy link
Member Author

martijnvg commented Feb 7, 2025

I noticed that many shard changes requests end up querying for monotonically increasing docids. So I did another iteration of this change and now sequential reader only gets used if docids are dense. This is similar to the implementation that gets for stored recovery source (LuceneChangesSnapshot).

To check the effect of this change I ran two benchmarks with the new elastic/logs logsdb with ccr track, the first is using stored recovery source (baseline in linked dashboard) and the second is with synthetic recovery source (contender in linked dashboard): https://esbench-metrics.kb.us-east-2.aws.elastic-cloud.com:9243/app/r/s/jbtVP

The results between stored and synthetic recovery source now look almost the same, which I think makes sense since both implementations now use the sequential reader in roughly the same number of times.

Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's very interesting! In this benchmark, source recovery is fast because it primarily relies on the sequential reader. However, in #114618, we mentioned that this scenario should be quite rare, so I’m curious, what changed? That said, it's reassuring to see that synthetic source recovery can be just as fast, even when using the sequential reader.


int[] nextDocIdArray = nextDocIds.toArray();
leafFieldLoader = storedFieldLoader.getLoader(leafReaderContext, nextDocIdArray);
leafSourceLoader = sourceLoader.leaf(leafReaderContext.reader(), nextDocIdArray);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another side effect of providing the array of document IDs is that some field loaders may choose to load their values eagerly. I don't see this as a problem, but I wanted to point out that we would lose this behavior if we implement the TODO above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I will update the TODO to include that perspective.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that some doc values loaders already apply this strategy when doc ids are provided and there is a single value per field:

@martijnvg
Copy link
Member Author

However, in #114618, we mentioned that this scenario should be quite rare, so I’m curious, what changed?

I think that the recent indexed logs are aligning often well with the index sorting and then fetching recent operations by seqno ends up with a dense monologic increasing docids. Some of data streams have one or no unique host name, while others up to 50 or something like that. I suspect if host.name had higher cardinality then we couldn't use a sequential stored reader often. But this then would also apply for LuceneChangesSnapshot.

That said, it's reassuring to see that synthetic source recovery can be just as fast, even when using the sequential reader.

Agreed, that is a nice observation.

@martijnvg martijnvg added the auto-backport Automatically create backport pull requests when merged label Feb 7, 2025
Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks Martijn!

@@ -9,6 +9,8 @@

package org.elasticsearch.index.engine;

import com.carrotsearch.hppc.IntArrayList;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you intend to use this namespace (seems odd since it comes from a testing tool)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
auto-backport Automatically create backport pull requests when merged :Distributed Indexing/Recovery Anything around constructing a new shard, either from a local or a remote source. >non-issue :StorageEngine/Logs You know, for Logs Team:Distributed Indexing Meta label for Distributed Indexing team Team:StorageEngine v8.18.1 v8.19.0 v9.0.1 v9.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants