-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Faster batch segment allocation by reducing metadata IO #17420
base: master
Are you sure you want to change the base?
Faster batch segment allocation by reducing metadata IO #17420
Conversation
...rvice/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
Fixed
Show fixed
Hide fixed
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
Fixed
Show fixed
Hide fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor suggestions.
@@ -36,6 +36,9 @@ public class TaskLockConfig | |||
@JsonProperty | |||
private long batchAllocationWaitTime = 0L; | |||
|
|||
@JsonProperty | |||
private boolean segmentAllocationReduceMetadataIO = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used only for batch segment allocation, IIUC. Let's rename it to batchAllocationReduceMetadataIO
.
Once this has been tested thoroughly, we can remove the flag altogether and use the new approach for both regular and batch allocation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you against its usage in the normal flow with the same feature flag?
It's behind a flag anyway so I don't think there's a need to not make the changes there.
String dataSource, | ||
Interval interval, | ||
boolean skipSegmentLineageCheck, | ||
Collection<SegmentAllocationHolder> holders | ||
Collection<SegmentAllocationHolder> holders, | ||
boolean skipSegmentPayloadFetchForAllocation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename this argument everywhere.
You can either call it reduceMetadataIO
same as the config or fetchRequiredSegmentsOnly
or something.
boolean skipSegmentPayloadFetchForAllocation | |
boolean reduceMetadataIO |
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
Show resolved
Hide resolved
@@ -87,6 +87,8 @@ public class SegmentAllocationQueue | |||
private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> keyToBatch = new ConcurrentHashMap<>(); | |||
private final BlockingDeque<AllocateRequestBatch> processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE); | |||
|
|||
private final boolean skipSegmentPayloadFetchForAllocation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename as suggested.
@@ -99,6 +121,133 @@ public void tearDown() | |||
emitter.flush(); | |||
} | |||
|
|||
@Test | |||
@Ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this test if we are not running it as a UT. If it is meant to be a benchmark, please put it in a Benchmark
class and share the details of a some sample run in the PR description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood, will remove it.
@Override | ||
public boolean isSegmentAllocationReduceMetadataIO() | ||
{ | ||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to run the tests in this class for both true and false and not just true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do this
); | ||
} | ||
|
||
// Populate the required segment info |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Populate the required segment info | |
// Create dummy segments for each segmentId with only the shard spec populated |
Segments.ONLY_VISIBLE | ||
) | ||
); | ||
return metadataStorage.getSegmentTimelineForAllocation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it inefficient here if skipSegmentPayloadFetchForAllocation is true? We are getting segments from retrieveUsedSegmentsForAllocation then creating a timeline via SegmentTimeline.forSegments and then getting segments back again via findNonOvershadowedObjectsInInterval. Why do we even need to create a timeline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean when it is false?
If it is true, we are simply getting all the used ids in the retrieval call, so we'd have to create a SegmentTimeline as we're interested only in the visible segment set
|
||
// Retrieve the segments for the ids stored in the map to get the numCorePartitions | ||
final Set<String> segmentIdsToRetrieve = new HashSet<>(); | ||
for (Map<Interval, SegmentId> itvlMap : versionIntervalToSmallestSegmentId.values()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we want/use the Smallest here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was so that we get a consistent result irrespective of the order in which the metadata store returns results
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add as comment on the use of Smallest. Thanks
return retrieveSegmentIds(dataSource, Collections.singletonList(interval)); | ||
} | ||
|
||
private Set<SegmentId> retrieveSegmentIds( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this also be called retrieveUsedSegmentIds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it should. Thanks, I'll make the change.
I've added a TaskLockPosse level cache for segment allocations with the idea that the used segment set and pending segments are covered by an appending lock and cannot change unless altered by tasks holding these locks. There are 24 intervals, each with 2000 datasegments, each with 1000 dimensions. Here are the results using a simple unit test:
|
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
Fixed
Show fixed
Hide fixed
@AmatyaAvadhanula , caching the segment state seems like a good idea. But it would be better to limit this PR to have only the The caching changes can be done as a follow up to this PR. |
Problem
Metadata IO is the main bottleneck for segment allocation times, which can contribute to lag significantly.
Currently, the metadata call to fetch used segments limits the allocation rate, and contributes to allocation wait time which can grow quite quickly. The sum of allocation times is a direct contributor to lag.
Idea
Segment allocation with
time chunk locking
requires only the segment ids and the number of core partitions to determine the next id.We can issue a much less expensive call to fetch segmentIds instead of segments if the segment payloads are significantly large due to the dimensions and metrics.
The core partitions do not change for a given (datasource, interval, version) and we can fetch exactly one segment by id for such combinations and reduce the load on the metadata store, and also reduce the time taken to build the DataSegment object on the Overlord.
Usage
Adds a new feature flag
druid.indexer.tasklock.segmentAllocationReduceMetadataIO
with default valuefalse
Setting this flag to true allows segment allocation to fetch only the required segmentIds and fewer segment payloads from the metadata store.
At present, this flag is only applicable for TimeChunk locking with batch segment allocation.
This PR has: