Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster batch segment allocation by reducing metadata IO #17420

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

AmatyaAvadhanula
Copy link
Contributor

Problem

Metadata IO is the main bottleneck for segment allocation times, which can contribute to lag significantly.
Currently, the metadata call to fetch used segments limits the allocation rate, and contributes to allocation wait time which can grow quite quickly. The sum of allocation times is a direct contributor to lag.

Idea

Segment allocation with time chunk locking requires only the segment ids and the number of core partitions to determine the next id.
We can issue a much less expensive call to fetch segmentIds instead of segments if the segment payloads are significantly large due to the dimensions and metrics.
The core partitions do not change for a given (datasource, interval, version) and we can fetch exactly one segment by id for such combinations and reduce the load on the metadata store, and also reduce the time taken to build the DataSegment object on the Overlord.

Usage

Adds a new feature flag
druid.indexer.tasklock.segmentAllocationReduceMetadataIO with default value false

Setting this flag to true allows segment allocation to fetch only the required segmentIds and fewer segment payloads from the metadata store.
At present, this flag is only applicable for TimeChunk locking with batch segment allocation.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestions.

@@ -36,6 +36,9 @@ public class TaskLockConfig
@JsonProperty
private long batchAllocationWaitTime = 0L;

@JsonProperty
private boolean segmentAllocationReduceMetadataIO = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used only for batch segment allocation, IIUC. Let's rename it to batchAllocationReduceMetadataIO.
Once this has been tested thoroughly, we can remove the flag altogether and use the new approach for both regular and batch allocation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you against its usage in the normal flow with the same feature flag?
It's behind a flag anyway so I don't think there's a need to not make the changes there.

String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
Collection<SegmentAllocationHolder> holders
Collection<SegmentAllocationHolder> holders,
boolean skipSegmentPayloadFetchForAllocation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename this argument everywhere.

You can either call it reduceMetadataIO same as the config or fetchRequiredSegmentsOnly or something.

Suggested change
boolean skipSegmentPayloadFetchForAllocation
boolean reduceMetadataIO

@@ -87,6 +87,8 @@ public class SegmentAllocationQueue
private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> keyToBatch = new ConcurrentHashMap<>();
private final BlockingDeque<AllocateRequestBatch> processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE);

private final boolean skipSegmentPayloadFetchForAllocation;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename as suggested.

@@ -99,6 +121,133 @@ public void tearDown()
emitter.flush();
}

@Test
@Ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this test if we are not running it as a UT. If it is meant to be a benchmark, please put it in a Benchmark class and share the details of a some sample run in the PR description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, will remove it.

@Override
public boolean isSegmentAllocationReduceMetadataIO()
{
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to run the tests in this class for both true and false and not just true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do this

);
}

// Populate the required segment info
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Populate the required segment info
// Create dummy segments for each segmentId with only the shard spec populated

Segments.ONLY_VISIBLE
)
);
return metadataStorage.getSegmentTimelineForAllocation(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it inefficient here if skipSegmentPayloadFetchForAllocation is true? We are getting segments from retrieveUsedSegmentsForAllocation then creating a timeline via SegmentTimeline.forSegments and then getting segments back again via findNonOvershadowedObjectsInInterval. Why do we even need to create a timeline?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean when it is false?
If it is true, we are simply getting all the used ids in the retrieval call, so we'd have to create a SegmentTimeline as we're interested only in the visible segment set


// Retrieve the segments for the ids stored in the map to get the numCorePartitions
final Set<String> segmentIdsToRetrieve = new HashSet<>();
for (Map<Interval, SegmentId> itvlMap : versionIntervalToSmallestSegmentId.values()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want/use the Smallest here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was so that we get a consistent result irrespective of the order in which the metadata store returns results

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add as comment on the use of Smallest. Thanks

return retrieveSegmentIds(dataSource, Collections.singletonList(interval));
}

private Set<SegmentId> retrieveSegmentIds(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this also be called retrieveUsedSegmentIds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should. Thanks, I'll make the change.

@AmatyaAvadhanula AmatyaAvadhanula marked this pull request as draft October 29, 2024 23:37
@AmatyaAvadhanula
Copy link
Contributor Author

AmatyaAvadhanula commented Oct 29, 2024

I've added a TaskLockPosse level cache for segment allocations with the idea that the used segment set and pending segments are covered by an appending lock and cannot change unless altered by tasks holding these locks.

There are 24 intervals, each with 2000 datasegments, each with 1000 dimensions.
With with a batching of 5, and 128 allocations (2 replicas) per interval i.e 3072 in total

Here are the results using a simple unit test:

  • Batch segment allocation with the flag turned off
INFO [main] org.apache.druid.indexing.common.actions.SegmentAllocateActionTest - Total time taken for [3072] allocations is [653233] ms.
  • Batch segment allocation with the flag turned on : 3x Improvement
INFO [main] org.apache.druid.indexing.common.actions.SegmentAllocateActionTest - Total time taken for [3072] allocations is [236854] ms.
  • Normal segment allocation with the mentioned caching with the flag: 100x improvement
INFO [main] org.apache.druid.indexing.common.actions.SegmentAllocateActionTest - Total time taken for [3072] allocations is [6302] ms.


TLDR : Turning the flag on gives a 3x boost over batch segment allocation, while using the cache gives about 100x.

@kfaraz
Copy link
Contributor

kfaraz commented Nov 4, 2024

@AmatyaAvadhanula , caching the segment state seems like a good idea.

But it would be better to limit this PR to have only the reduceMetadataIO changes which fetch only the segment IDs and not all the payloads. That change has already been tested on some clusters and it is best to keep it as a separate patch from other experimental changes.

The caching changes can be done as a follow up to this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants