Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explore use of LogByteSizeMergePolicy for time series data use cases #9241

Open
rishabhmaurya opened this issue Aug 10, 2023 · 9 comments
Open
Labels
enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing v2.11.0 Issues and PRs related to version 2.11.0

Comments

@rishabhmaurya
Copy link
Contributor

Is your feature request related to a problem? Please describe.
LogByteSizeMergePolicy always merges adjacent segments together which could be helpful for cases for time series data where documents are sorted based on timestamp and segments usually don't have much overlap on timestamps. At query time, its better if the time range can be contained in lesser number of segments and other segments can be skipped by checking min/max value of timestamp field. When adjacent segments are merged, its likelihood increases significantly.
TieredMergePolicy, which is successor of LogByteSizeMergePolicy and current default merges segments more smartly and can merge non-adjacent segments too, which could be inefficient for time series data.

Describe the solution you'd like
Explore usage of LogByteSizeMergePolicy for data streams use cases where @timestamp is a mandatory field.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

@rishabhmaurya rishabhmaurya added enhancement Enhancement or improvement to existing feature or request untriaged labels Aug 10, 2023
@anasalkouz anasalkouz added Indexing Indexing, Bulk Indexing and anything related to indexing and removed untriaged labels Aug 11, 2023
@gashutos
Copy link
Contributor

gashutos commented Aug 17, 2023

There is one more mergepolicy got introduced interleaved ShuffledMergePolicy.
This was introduced for exactly opposite reason than what you are trying to achieve here and make the time series based documents shuffled for better desc order performance.
desc sort order is wide scenario on timeseries and computation heavy compare to range on time series based workload. range queries anyway are very fast due to BKDs and I would be very curious to see how much improvement we can get with LogByteSizeMergePolicy merge policy.

Will be curious to know numbers here, but definitely consider comparing sort performance along with range queries

@rishabhmaurya
Copy link
Contributor Author

rishabhmaurya commented Aug 18, 2023

Thanks @gashutos for taking a look. I'm currently working towards getting numbers out for comparison.
The shuffle merge policy is only applicable on force merges, which is like a final merge operation before index goes into read-only state for most of the cases. And there, as total number of segments would be less in number and bigger in size, it totally makes sense to interleave read operation on old and new segments.

range queries anyway are very fast due to BKDs and I would be very curious to see how much improvement we can get with LogByteSizeMergePolicy merge policy.

This optimization wouldn't be helpful with anything within BKD, the min and max ranges of timestamps for BKDs will be significantly lower after this change and points would be more dense. Also, searching across segments would be more ordered too.

@rishabhmaurya
Copy link
Contributor Author

rishabhmaurya commented Aug 21, 2023

I'm not seeing any improvement using LogByteSizeMergePolicy against http_logs workload.

Concern 1:

http_logs doesn't generate workload which matches real world use cases of data streams where multiple clients are ingesting in a time ordered way.
The way current logic works is, the whole corpus is segmented into chunks and each indexing client is assigned an offset from where they start ingesting concurrently. This makes the timestamp order randomized and LogByteSizeMergePolicy is not expected to perform any better in such scenarios and would result in higher write and read amplification without any gains in read latencies.

Concern 2:

OpenSearch benchmark currently completes ingestion and then run queries for http_logs workload as per test procedure defined here. This is like running queries only on read-only indices which is usually not the case in production environment of time based data like logs. LogByteSize merge policy would be more effective for indices with active writes.

I'm currently working on setting up such data stream locally to evaluate the improvements if any. Meanwhile, also working with opensearch-benchmark to add such capability - opensearch-project/opensearch-benchmark#365

I'm sounding more like tailoring the use cases to align with effectiveness of LogByteSize merge policy, but they seem to me like a valid use cases which should be incorporated in opensearch-benchmarks.

tagging @nknize

Posting numbers with current logic -
Machine: r5.2xlarge
Heap: 32 GB

LogByteSizeMerge Policy

Using following defaults -
https://github.com/rishabhmaurya/OpenSearch/blob/a0ccb11de2770f9fde7b49508410def89d02934a/server/src/main/java/org/opensearch/index/MergePolicyConfig.java#L231-L236

mergeFactor = 10
minMergeSize = 2MB
maxMergeSizeMB = 5GB
maxMergeMBForForcedMerge = -1 (or inf)

Latency (ms)

Operation Type P50 P90 P99 P100
hourly_agg 3,092.1 3,163.6 3,215.5 3,286.4
asc_sort_timestamp 5.657 6.398 7.855 8.041
index-stats-wait-until-merges-finish 22,793.3 26,391.2 26,391.2 26,391.2
asc_sort_with_after_timestamp 16.047 22.208 23.94 25.825
index-stats-wait-until-merges-1-seg-finish 40,995.5 72,900.9 72,900.9 72,900.9
desc-sort-with-after-timestamp-after-force-merge-1-seg 217.5 283.5 312.1 325.6
force-merge-1-seg 240,011.3 240,013 240,013 240,013
scroll 587 601.9 659.7 720.4
range 7.373 7.917 10.654 54.852
index-append 175.4 267.6 476.2 10,609.1
desc_sort_with_after_timestamp 129.8 143.5 155.4 180.7
default 4.349 4.554 4.845 6.617
asc-sort-timestamp-after-force-merge-1-seg 5.784 7.524 8.455 9.358
200s-in-range 5.98 6.326 6.745 10.67
400s-in-range 3.976 4.26 4.548 6.429
term 4.542 4.794 5.236 6.076
desc_sort_timestamp 48.449 70.893 76.449 78.903
asc-sort-with-after-timestamp-after-force-merge-1-seg 37.074 43.786 47.948 60.585
desc-sort-timestamp-after-force-merge-1-seg 181.8 228.5 262.5 270.1

TieredMergePolicy


Latency (ms)

Operation Type P50 P90 P99 P100
hourly_agg 3,003.4 3,079.4 3,132.8 3,187.5
asc_sort_timestamp 7.053 8.149 9.872 10.126
index-stats-wait-until-merges-finish 5,586.6 58,237.1 58,237.1 58,237.1
asc_sort_with_after_timestamp 18.278 21.437 24.302 48.292
index-stats-wait-until-merges-1-seg-finish 19,230.3 103,239 103,239 103,239
desc-sort-with-after-timestamp-after-force-merge-1-seg 95.924 164.9 190.2 211
force-merge-1-seg 240,010.7 240,013 240,013 240,013
scroll 598.8 612.9 671.5 682.2
range 7.636 8.312 10.476 47.232
index-append 163.2 259.5 509.5 11,028.8
desc_sort_with_after_timestamp 41.707 55.354 68.997 99.183
default 4.556 4.824 5.579 6.17
asc-sort-timestamp-after-force-merge-1-seg 6.822 7.802 9.374 9.95
200s-in-range 6.326 6.925 10.44 41.752
400s-in-range 4.201 4.522 5.238 5.66
term 5.157 5.52 6.151 7.539
desc_sort_timestamp 18.51 24.365 49.348 56.725
asc-sort-with-after-timestamp-after-force-merge-1-seg 31.552 38.305 47.129 79.136
desc-sort-timestamp-after-force-merge-1-seg 113.8 169 181.1 184

@gashutos
Copy link
Contributor

Thank you for the benchmark !! Some regression is there as well.

http_logs doesn't generate workload which matches real world use cases of data streams where multiple clients are ingesting in a time ordered way.
The way current logic works is, the whole corpus is segmented into chunks and each client is assigned an offset from where it starts ingesting. This makes the timestamp order randomized and LogByteSizeMergePolicy is not expected to perform any better in such scenarios and would result in higher write and read amplification without any gains in read latencies

If you take a segment and see values, they are very much sorted by timestamp. (nearly sorted). Just do desc order sort by timestamp on any segment and you will notice results from last chunk of docIds in segment.
Would not this be a production level scenario ? Where multiple service are pushing logs and they are pushing in chunks only. SO it might be possible skewing sort order little bit.

Probably for testing, you can try with --workload-params="bulk_indexing_clients:1" ?

@rishabhmaurya
Copy link
Contributor Author

rishabhmaurya commented Aug 22, 2023

I do see some performance gain when running locally on system with following configuration -

I had to customize the http_logs workload. I'm just using one log file, index with one shard and just few operations - index-append, asc_sort_timestamp, desc_sort_timestamp;
I had to explicitly set refresh interval as 1s so that the translog doesn't grow huge and new segments are created frequently, so that merge policy can come into picture sooner. If we don't set refresh interval setting, since all searches happens after indexing is complete and force merge is complete, so it keep filling translog and segments are created much later (whe explicit force merge is triggered) and thus merge policy comes into picture much later.

workload: customized http_logs 
index: logs-181998; shard count: 1; doc count: 2708746;
indexing client count: 1
benchmark machine: c4.2xlarge
data node count: 1; r5.2xlarge; heap: 32GB

With tiered merge policy

|                                                 Min Throughput |      desc_sort_timestamp |        0.51 |  ops/s |
|                                                Mean Throughput |      desc_sort_timestamp |        0.51 |  ops/s |
|                                              Median Throughput |      desc_sort_timestamp |        0.51 |  ops/s |
|                                                 Max Throughput |      desc_sort_timestamp |        0.51 |  ops/s |
|                                        50th percentile latency |      desc_sort_timestamp |     10.3268 |     ms |
|                                        90th percentile latency |      desc_sort_timestamp |     11.4483 |     ms |
|                                       100th percentile latency |      desc_sort_timestamp |     12.5894 |     ms |
|                                   50th percentile service time |      desc_sort_timestamp |     7.01704 |     ms |
|                                   90th percentile service time |      desc_sort_timestamp |     8.43393 |     ms |
|                                  100th percentile service time |      desc_sort_timestamp |     10.1131 |     ms |
|                                                     error rate |      desc_sort_timestamp |           0 |      % |
|                                                 Min Throughput |       asc_sort_timestamp |        0.51 |  ops/s |
|                                                Mean Throughput |       asc_sort_timestamp |        0.51 |  ops/s |
|                                              Median Throughput |       asc_sort_timestamp |        0.51 |  ops/s |
|                                                 Max Throughput |       asc_sort_timestamp |        0.51 |  ops/s |
|                                        50th percentile latency |       asc_sort_timestamp |     9.57576 |     ms |
|                                        90th percentile latency |       asc_sort_timestamp |     10.3733 |     ms |
|                                       100th percentile latency |       asc_sort_timestamp |     11.4519 |     ms |
|                                   50th percentile service time |       asc_sort_timestamp |      6.4471 |     ms |
|                                   90th percentile service time |       asc_sort_timestamp |     7.24644 |     ms |
|                                  100th percentile service time |       asc_sort_timestamp |      8.9884 |     ms |
|                                                     error rate |       asc_sort_timestamp |           0 |      % |

with log merge policy

mergeFactor = 10
minMergeSize = 2MB
maxMergeSizeMB = 5GB
maxMergeMBForForcedMerge = -1 (or inf)
|                                                 Min Throughput |      desc_sort_timestamp |        0.51 |  ops/s |
|                                                Mean Throughput |      desc_sort_timestamp |        0.51 |  ops/s |
|                                              Median Throughput |      desc_sort_timestamp |        0.51 |  ops/s |
|                                                 Max Throughput |      desc_sort_timestamp |        0.51 |  ops/s |
|                                        50th percentile latency |      desc_sort_timestamp |     9.71618 |     ms |
|                                        90th percentile latency |      desc_sort_timestamp |     10.8187 |     ms |
|                                       100th percentile latency |      desc_sort_timestamp |     12.6003 |     ms |
|                                   50th percentile service time |      desc_sort_timestamp |      6.6176 |     ms |
|                                   90th percentile service time |      desc_sort_timestamp |     7.80692 |     ms |
|                                  100th percentile service time |      desc_sort_timestamp |     9.29486 |     ms |
|                                                     error rate |      desc_sort_timestamp |           0 |      % |
|                                                 Min Throughput |       asc_sort_timestamp |        0.51 |  ops/s |
|                                                Mean Throughput |       asc_sort_timestamp |        0.51 |  ops/s |
|                                              Median Throughput |       asc_sort_timestamp |        0.51 |  ops/s |
|                                                 Max Throughput |       asc_sort_timestamp |        0.51 |  ops/s |
|                                        50th percentile latency |       asc_sort_timestamp |     8.70018 |     ms |
|                                        90th percentile latency |       asc_sort_timestamp |     9.96291 |     ms |
|                                       100th percentile latency |       asc_sort_timestamp |     10.8828 |     ms |
|                                   50th percentile service time |       asc_sort_timestamp |     5.62269 |     ms |
|                                   90th percentile service time |       asc_sort_timestamp |     6.75591 |     ms |
|                                  100th percentile service time |       asc_sort_timestamp |      8.3948 |     ms |
|                                                     error rate |       asc_sort_timestamp |           0 |      % |

If you take a segment and see values, they are very much sorted by timestamp. (nearly sorted). Just do desc order sort by timestamp on any segment and you will notice results from last chunk of docIds in segment.
Would not this be a production level scenario ? Where multiple service are pushing logs and they are pushing in chunks only. SO it might be possible skewing sort order little bit.

if you look at the timestamp difference between the 1st and the 8th client at a given time, its quite significant - opensearch-project/opensearch-benchmark#365 (comment)

@rishabhmaurya
Copy link
Contributor Author

rishabhmaurya commented Aug 23, 2023

below is the snapshot of the segments created by both merge policies. The X axis denotes the segment, sorted by the document count and Y axis represents the @timestamp field min, max value stored in BKD. We can see the delta by min & max is growing large in tiered MP. Given, the corpus is small, we didn't see much performance improvements.
Next I'm working on ingesting more documents taking care of following -

  1. all indexing clients should process the timestamps as close as possible.
  2. set explicit refresh interval to values - 5s, 10s, 30s and compare the performance.
  3. measure the write amputation for merges and compare both policies.
  4. Compare the performance of - asc_sort_with_after_timestamp desc_sort_with_after_timestamp - - with different and more relevant search_after values. asc_sort_timestamp &desc_sort_timestamp.

Log Byte Size
LogMerge_2

Tiered
Tiered_2

@itiyama
Copy link

itiyama commented Aug 23, 2023

@rishabhmaurya If time is really a concern, why not make the merge policy time aware?

@rishabhmaurya
Copy link
Contributor Author

rishabhmaurya commented Aug 23, 2023

@itiyama open to such experiments however I think the gain may not be significant for timeseries usecase. Assuming the agents sending data from different machines don't have a significant time lag, merging adjacent segments would ensure the overlap of timestamps across segments isn't a lot however tiered merge policy could worsen the overlaps. The timestamp based approach might be more helpful in cases when there is a significant lag between clients ingesting but that's usually not the case and we should not to optimize for such anomalies. Do you have better ideas on how you want to make use of timestamps?

@rishabhmaurya
Copy link
Contributor Author

Some of the outstanding items before making LogByteSizeMergePolicy default for timestamp based index -

  1. With current testing so far, the tail segments created by LogByteSizeMergePolicy are significantly bigger than TieredMergePolicy keeping merge factor and min segment size same. This is causing increase in latency for desc sort queries when compared to TieredMergePolicy in http logs workload. AI Fix Timestamp ordering isn't maintained the way corpus is segmented into chunks among indexing clients opensearch-benchmark#365 and experiment with LogByteSizeMergePolicy settings and optimize for http_logs dataset.
  2. Check if we can improve the behavior of LogByteSizeMergePolicy to avoid very large segments in case of findForcedDeletesMerges as discussed here Configurable merge policy and option to choose between LogByteSize and Tiered MergePolicy #9992 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing v2.11.0 Issues and PRs related to version 2.11.0
Projects
Status: Done
Development

No branches or pull requests

5 participants