[Improve][connector-milvus] Improved milvus source enumerator splits allocation algorithm for subtasks#10868
[Improve][connector-milvus] Improved milvus source enumerator splits allocation algorithm for subtasks#10868JeremyXin wants to merge 4 commits into
Conversation
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the contribution. I reviewed the real Milvus source enumerator lifecycle on the latest head.
What this PR fixes
- User pain: the current
splitId.hashCode() % parallelismmapping can skew Milvus split ownership. - Fix approach: sort split ids first, then assign them round-robin.
- In one sentence: the idea is reasonable, but it does not hold on the real multi-collection runtime path.
Runtime path I checked
run() [MilvusSourceSplitEnumerator.java:95-111]
-> poll one collection from pendingTables
-> generateSplits(table) [117-165]
-> partition-key collections produce exactly one split [155-163]
-> addPendingSplit(splits) [171-185]
-> assignCount is reset to 0 on every call
-> assignSplit(readers) [192-201]
Problem 1: the real multi-collection path still concentrates work on one reader
- Location:
MilvusSourceSplitEnumerator.java:95-111, 155-185 - Why this is a problem:
run()processes collections one by one, and many collections produce exactly one split. SinceassignCountis reset to0inside everyaddPendingSplit()call, the first split of every such collection starts again from reader 0. - Risk: the PR can keep producing the same skew it is trying to remove, especially for workloads with many single-split collections.
- Suggested fix:
- Option A: keep
assignCountmonotonic across the whole enumeration cycle. - Option B: aggregate splits across collections first, then do one global round-robin pass.
- Option A: keep
- Severity: high
Problem 2: the new test does not cover the real multi-collection lifecycle
- Location:
MilvusSourceSplitEnumeratorTest.java:37-58 - Why this is a problem: the test only invokes the private helper once with one synthetic batch, so it never exercises the real
run()path where collections are processed one by one. - Risk: the current regression is happening in the real lifecycle, not in the helper itself.
- Suggested fix: add a
run()-level test that covers multiple single-split collections. - Severity: medium
One thing that is still okay here: addSplitsBack() keeps the returned splits pinned to the original subtaskId, so the recovery path itself is not the regression in this connector.
Conclusion: merge after fixes
- Blocking items
- Problem 1: the balancing claim is not true on the real multi-collection path.
- Suggested follow-up
- Problem 2: please add lifecycle-level coverage for the multi-collection
run()path.
There was a problem hiding this comment.
| log.info("Add back splits {} to MilvusSourceSplitEnumerator.", splits.size()); |
There was a problem hiding this comment.
done. Are there any other questions?
There was a problem hiding this comment.
| private String createSplitId(TablePath tablePath, String index) { |
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the update. I re-reviewed the latest head from scratch, including the real run() path and the source enumerator restore path.
What This PR Fixes
- User pain: the previous
splitId.hashCode() % parallelismstrategy could skew Milvus split ownership badly, especially when many collections only produce one split. - Fix approach: the latest head sorts split IDs and assigns owners with a global round-robin counter.
- One-line summary: the steady-state multi-collection path is fixed now, but the restore path still loses the round-robin counter and can reintroduce the same skew after failover.
Runtime Chain Rechecked
Normal enumeration
-> run() [95-111]
-> generateSplits(table) [117-165]
-> addPendingSplit(splits) [171-185]
-> assignCount is incremented globally
-> assignSplit(readers) [191-201]
Checkpoint / restore
-> snapshotState() [251-255]
-> only persists pendingTables + pendingSplits
-> constructor(sourceState) [75-81]
-> restores pendingTables + pendingSplits
-> assignCount starts from 0 again
-> remaining collections are assigned from reader 0 again
Key Findings
- The normal
run()path does now hit the intended global round-robin behavior, so the main steady-state issue from the previous review is closed. - But
assignCounthas become part of the allocation semantics and still is not included inMilvusSourceState. - That means the same skew can come back on the restore path, which is a real gap because this enumerator already implements checkpoint state.
Findings
Issue 1: assignCount is not part of checkpoint state, so restore restarts round-robin from reader 0
- Location:
MilvusSourceSplitEnumerator.java:63,75-81,171-188,251-255;MilvusSourceState.java:31-34 - Why it matters: if a checkpoint happens mid-enumeration, the remaining collections after restore no longer continue the global round-robin sequence.
- Risk: the load-balancing fix can regress in failover / restore scenarios.
- Suggested fix:
- Option A: persist
assignCountdirectly inMilvusSourceStateand restore it. - Option B: derive the next counter value from restored assignment progress, but that is more fragile.
- Option A: persist
- Severity: Medium
Issue 2: the new key state field assignCount still lacks an explanatory comment
- Location:
MilvusSourceSplitEnumerator.java:63 - Why it matters: this field now defines cross-collection allocation semantics, and whether it must be restored is not obvious without documentation.
- Risk: the same bug can easily come back during later refactors.
- Suggested fix: add a short English comment explaining that
assignCountmust stay global across collections and restore. - Severity: Medium
Merge Decision
Conclusion: can merge after fixes
- Blocking items
- Issue 1: persist or correctly reconstruct the round-robin counter across restore.
- Suggested follow-up
- Issue 2: document the responsibility of
assignCountdirectly in the code.
Overall, the latest head fixes the main steady-state lifecycle gap from the previous round, but I do not think this change is fully closed until the restore path preserves the same allocation semantics.
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the update. I re-reviewed the latest head from scratch against upstream/dev, including the steady-state run() path and the restore path that Daniel previously blocked.
What This PR Fixes
- User pain: the previous
splitId.hashCode() % parallelismstrategy could skew Milvus split ownership, especially when many collections only produce one split. - Fix approach: the current head keeps a global round-robin counter, sorts split ids before assignment, persists that counter in checkpoint state, and keeps returned splits pinned to their original reader.
- One-line summary: the normal multi-collection path and the restore path are now using the same allocation semantics.
Runtime Chain Rechecked
normal enumeration
-> run() [MilvusSourceSplitEnumerator.java:97-117]
-> poll one collection from pendingTables
-> generateSplits(table) [119-167]
-> addPendingSplit(splits) [173-187]
-> sort split ids
-> owner = assignCount % parallelism
-> assignCount++ persists across collections
-> assignSplit(readers) [193-203]
returned split path
-> addSplitsBack(splits, subtaskId) [212-226]
-> addPendingSplit(splits, subtaskId)
-> assign back to the same registered reader
checkpoint / restore
-> snapshotState() [252-259]
-> persist pendingTables + pendingSplits + assignCount
-> constructor(sourceState) [76-83]
-> restore assignCount before remaining collections continue
Findings
- The steady-state multi-collection skew issue from the previous Daniel review is fixed on the current head.
assignCountis no longer reset per collection. - The restore gap is also fixed now.
MilvusSourceStatepersistsassignCount, so failover no longer restarts the round-robin sequence from reader 0. - The returned-split path still pins work to the original reader, so the recovery contract remains consistent with the pre-change behavior.
- The new tests now cover the three runtime concerns that mattered here: multi-collection balancing, round-robin continuation after restore, and returned-split reassignment.
Compatibility / Side Effects
- No user-facing config or external protocol changed.
- The only new persisted field is
assignCount, which is exactly the state needed to keep restore semantics correct. - I do not see a new performance or concurrency problem in this patch; the extra state is just one integer plus deterministic sorting.
Tests / CI
- The lifecycle-oriented tests look directionally correct for this change.
- The current top-level
Buildis still red on the reviewed head. The visible failures includeDead links,unit-test (8, windows-latest), plus a wider set of cancelled / skipped follow-on jobs in the same run.
Conclusion: can merge after fixes
Blocking items:
- No source-level blocker from my side on the current head.
- Please get the latest
Buildback to green before merge.
Suggested non-blocking follow-up:
- None for the current code path.
The two runtime correctness gaps Daniel previously blocked are closed on this revision. The remaining gate is CI, not a reopened source-level problem.
|
Thanks, I saw the follow-up on the reviewer suggestions. From Daniel's side there is still no new code after the last rereview, so I'm keeping the current conclusion unchanged on this head. I do not have a reopened source-level blocker here. The last Daniel review already narrowed the remaining gate to CI. Once the latest Build is green on the current head, I would not require another full source review from my side unless new commits are pushed. |
Purpose of this pull request
Similar to pr #9108, improving milvus source enumerator splits allocation algorithm for subtasks and add UT.
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.