Skip to content

[Improve][connector-milvus] Improved milvus source enumerator splits allocation algorithm for subtasks#10868

Open
JeremyXin wants to merge 4 commits into
apache:devfrom
JeremyXin:improve-milvus-split-balance
Open

[Improve][connector-milvus] Improved milvus source enumerator splits allocation algorithm for subtasks#10868
JeremyXin wants to merge 4 commits into
apache:devfrom
JeremyXin:improve-milvus-split-balance

Conversation

@JeremyXin
Copy link
Copy Markdown
Contributor

Purpose of this pull request

Similar to pr #9108, improving milvus source enumerator splits allocation algorithm for subtasks and add UT.

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution. I reviewed the real Milvus source enumerator lifecycle on the latest head.

What this PR fixes

  • User pain: the current splitId.hashCode() % parallelism mapping can skew Milvus split ownership.
  • Fix approach: sort split ids first, then assign them round-robin.
  • In one sentence: the idea is reasonable, but it does not hold on the real multi-collection runtime path.

Runtime path I checked

run() [MilvusSourceSplitEnumerator.java:95-111]
  -> poll one collection from pendingTables
  -> generateSplits(table) [117-165]
      -> partition-key collections produce exactly one split [155-163]
  -> addPendingSplit(splits) [171-185]
      -> assignCount is reset to 0 on every call
  -> assignSplit(readers) [192-201]

Problem 1: the real multi-collection path still concentrates work on one reader

  • Location: MilvusSourceSplitEnumerator.java:95-111, 155-185
  • Why this is a problem: run() processes collections one by one, and many collections produce exactly one split. Since assignCount is reset to 0 inside every addPendingSplit() call, the first split of every such collection starts again from reader 0.
  • Risk: the PR can keep producing the same skew it is trying to remove, especially for workloads with many single-split collections.
  • Suggested fix:
    • Option A: keep assignCount monotonic across the whole enumeration cycle.
    • Option B: aggregate splits across collections first, then do one global round-robin pass.
  • Severity: high

Problem 2: the new test does not cover the real multi-collection lifecycle

  • Location: MilvusSourceSplitEnumeratorTest.java:37-58
  • Why this is a problem: the test only invokes the private helper once with one synthetic batch, so it never exercises the real run() path where collections are processed one by one.
  • Risk: the current regression is happening in the real lifecycle, not in the helper itself.
  • Suggested fix: add a run()-level test that covers multiple single-split collections.
  • Severity: medium

One thing that is still okay here: addSplitsBack() keeps the returned splits pinned to the original subtaskId, so the recovery path itself is not the regression in this connector.

Conclusion: merge after fixes

  1. Blocking items
  • Problem 1: the balancing claim is not true on the real multi-collection path.
  1. Suggested follow-up
  • Problem 2: please add lifecycle-level coverage for the multi-collection run() path.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.info("Add back splits {} to MilvusSourceSplitEnumerator.", splits.size());

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. Are there any other questions?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private String createSplitId(TablePath tablePath, String index) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I re-reviewed the latest head from scratch, including the real run() path and the source enumerator restore path.

What This PR Fixes

  • User pain: the previous splitId.hashCode() % parallelism strategy could skew Milvus split ownership badly, especially when many collections only produce one split.
  • Fix approach: the latest head sorts split IDs and assigns owners with a global round-robin counter.
  • One-line summary: the steady-state multi-collection path is fixed now, but the restore path still loses the round-robin counter and can reintroduce the same skew after failover.

Runtime Chain Rechecked

Normal enumeration
  -> run() [95-111]
      -> generateSplits(table) [117-165]
      -> addPendingSplit(splits) [171-185]
          -> assignCount is incremented globally
      -> assignSplit(readers) [191-201]

Checkpoint / restore
  -> snapshotState() [251-255]
      -> only persists pendingTables + pendingSplits
  -> constructor(sourceState) [75-81]
      -> restores pendingTables + pendingSplits
      -> assignCount starts from 0 again
  -> remaining collections are assigned from reader 0 again

Key Findings

  • The normal run() path does now hit the intended global round-robin behavior, so the main steady-state issue from the previous review is closed.
  • But assignCount has become part of the allocation semantics and still is not included in MilvusSourceState.
  • That means the same skew can come back on the restore path, which is a real gap because this enumerator already implements checkpoint state.

Findings

Issue 1: assignCount is not part of checkpoint state, so restore restarts round-robin from reader 0

  • Location: MilvusSourceSplitEnumerator.java:63,75-81,171-188,251-255; MilvusSourceState.java:31-34
  • Why it matters: if a checkpoint happens mid-enumeration, the remaining collections after restore no longer continue the global round-robin sequence.
  • Risk: the load-balancing fix can regress in failover / restore scenarios.
  • Suggested fix:
    • Option A: persist assignCount directly in MilvusSourceState and restore it.
    • Option B: derive the next counter value from restored assignment progress, but that is more fragile.
  • Severity: Medium

Issue 2: the new key state field assignCount still lacks an explanatory comment

  • Location: MilvusSourceSplitEnumerator.java:63
  • Why it matters: this field now defines cross-collection allocation semantics, and whether it must be restored is not obvious without documentation.
  • Risk: the same bug can easily come back during later refactors.
  • Suggested fix: add a short English comment explaining that assignCount must stay global across collections and restore.
  • Severity: Medium

Merge Decision

Conclusion: can merge after fixes

  1. Blocking items
  • Issue 1: persist or correctly reconstruct the round-robin counter across restore.
  1. Suggested follow-up
  • Issue 2: document the responsibility of assignCount directly in the code.

Overall, the latest head fixes the main steady-state lifecycle gap from the previous round, but I do not think this change is fully closed until the restore path preserves the same allocation semantics.

Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I re-reviewed the latest head from scratch against upstream/dev, including the steady-state run() path and the restore path that Daniel previously blocked.

What This PR Fixes

  • User pain: the previous splitId.hashCode() % parallelism strategy could skew Milvus split ownership, especially when many collections only produce one split.
  • Fix approach: the current head keeps a global round-robin counter, sorts split ids before assignment, persists that counter in checkpoint state, and keeps returned splits pinned to their original reader.
  • One-line summary: the normal multi-collection path and the restore path are now using the same allocation semantics.

Runtime Chain Rechecked

normal enumeration
  -> run() [MilvusSourceSplitEnumerator.java:97-117]
      -> poll one collection from pendingTables
      -> generateSplits(table) [119-167]
      -> addPendingSplit(splits) [173-187]
          -> sort split ids
          -> owner = assignCount % parallelism
          -> assignCount++ persists across collections
      -> assignSplit(readers) [193-203]

returned split path
  -> addSplitsBack(splits, subtaskId) [212-226]
      -> addPendingSplit(splits, subtaskId)
      -> assign back to the same registered reader

checkpoint / restore
  -> snapshotState() [252-259]
      -> persist pendingTables + pendingSplits + assignCount
  -> constructor(sourceState) [76-83]
      -> restore assignCount before remaining collections continue

Findings

  1. The steady-state multi-collection skew issue from the previous Daniel review is fixed on the current head. assignCount is no longer reset per collection.
  2. The restore gap is also fixed now. MilvusSourceState persists assignCount, so failover no longer restarts the round-robin sequence from reader 0.
  3. The returned-split path still pins work to the original reader, so the recovery contract remains consistent with the pre-change behavior.
  4. The new tests now cover the three runtime concerns that mattered here: multi-collection balancing, round-robin continuation after restore, and returned-split reassignment.

Compatibility / Side Effects

  • No user-facing config or external protocol changed.
  • The only new persisted field is assignCount, which is exactly the state needed to keep restore semantics correct.
  • I do not see a new performance or concurrency problem in this patch; the extra state is just one integer plus deterministic sorting.

Tests / CI

  • The lifecycle-oriented tests look directionally correct for this change.
  • The current top-level Build is still red on the reviewed head. The visible failures include Dead links, unit-test (8, windows-latest), plus a wider set of cancelled / skipped follow-on jobs in the same run.

Conclusion: can merge after fixes

Blocking items:

  1. No source-level blocker from my side on the current head.
  2. Please get the latest Build back to green before merge.

Suggested non-blocking follow-up:

  • None for the current code path.

The two runtime correctness gaps Daniel previously blocked are closed on this revision. The remaining gate is CI, not a reopened source-level problem.

@DanielLeens
Copy link
Copy Markdown
Contributor

Thanks, I saw the follow-up on the reviewer suggestions.

From Daniel's side there is still no new code after the last rereview, so I'm keeping the current conclusion unchanged on this head.

I do not have a reopened source-level blocker here. The last Daniel review already narrowed the remaining gate to CI. Once the latest Build is green on the current head, I would not require another full source review from my side unless new commits are pushed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants