Skip to content

[OPIK-6602] [SDK] feat: serialize migrate streamer + add concurrency diagnostics#6807

Open
JetoPistola wants to merge 3 commits into
mainfrom
danield/OPIK-6602-migrate-avoid-concurrent-writes
Open

[OPIK-6602] [SDK] feat: serialize migrate streamer + add concurrency diagnostics#6807
JetoPistola wants to merge 3 commits into
mainfrom
danield/OPIK-6602-migrate-avoid-concurrent-writes

Conversation

@JetoPistola
Copy link
Copy Markdown
Contributor

@JetoPistola JetoPistola commented May 20, 2026

Details

image

opik migrate now forces OPIK_BACKGROUND_WORKERS=1 for the migrate-time opik.Opik() client. This reduces workspace-level in-flight HTTP writes during the experiment cascade from up to 5 → 2 (4 streamer consumers + main thread → 1 streamer consumer + main thread). Migrate's dataset writes were already serialized on the main thread (raw Fern client); this collapses the trace/span/feedback streamer to a single consumer alongside it.

Also adds DEBUG-gated diagnostics so a future failing run is grep-able:

  • per-version source counts + post-apply re-fetch of destination (items_total, streamed count) with WARNING on mismatch in version_replay.py
  • per-experiment cascade summary in experiments.py (source items, trace ids, span/comment counts attempted)
  • migrate-scoped httpx event hook (_debug_http.py) tagging every outbound request with thread id, start/end timestamps, elapsed ms, and a per-host in_flight counter; installed only when the opik logger is at DEBUG so production callers pay zero overhead. Idempotent across repeat calls (review fix at 6932701f1).

Adds a stress E2E (tests/e2e/cli/test_migrate_dataset_stress_e2e.py) mirroring the QA repro shapes (qa-v1-stress-traces-5k: 100 items + 5000 cascade; qa-v1-stress-mega: 1458 items × 27 versions). The test asserts exact counts at every layer (dataset_items_count, per-version items_total, per-version stream, experiment items) so a future regression that silently drops items fails loudly. Slow shapes marked with pytest.mark.slow. Seed bulk-writes wrap with ensure_rest_api_call_respecting_rate_limit (commit 2006b472b) so the test is runnable on rate-limited environments.

Honest scope. The user-visible data loss QA reported requires concurrent writers on the same dataset_version chain (OPIK-6600) or a multi-replica ClickHouse visibility window (OPIK-6601). Migrate by itself does NOT manufacture dataset-endpoint concurrency — verified via the new httpx interceptor showing 63 dataset writes all at in_flight ≤ 1 across a mega-shape run. Staging A/B (below) further confirms Option A doesn't change pass/fail status on the multi-replica BE topology QA used. This PR is defensive hardening + diagnostics + a regression net; the user-visible data loss must still be addressed BE-side under OPIK-6600 / OPIK-6601.

Change checklist

  • User facing
  • Documentation update

Issues

  • OPIK-6602

AI-WATERMARK

AI-WATERMARK: yes

  • Tools: Claude Code (claude-opus-4-7, 1M context)
  • Model(s): Claude Opus 4.7
  • Scope: full implementation (code + tests + diagnostics + commit message)
  • Human verification: code review by author, manual run of migrate CLI with DEBUG, full stress E2E suite passes (3/3) locally + against staging both with and without the fix

Testing

Tested on two BE topologies:

Local Opik backend (http://localhost:5174/api, single-replica ClickHouse, default opik.sh --backend setup):

  • Stress E2E: pytest tests/e2e/cli/test_migrate_dataset_stress_e2e.py -v → 3/3 PASS in 57.7s
  • DEBUG run: confirmed migrate.http hook installed line fires; per-version POST-APPLY counts agree at every version; 39 of 40 dataset HTTP windows at in_flight=1, one in_flight=2 (background is-alive/ping)

Staging (https://staging.dev.comet.com/opik/api, multi-replica — the BE topology QA was on): ran the full stress matrix WITH and WITHOUT Option A to A/B the effect:

Shape Run 1 (with fix, background_workers=1) Run 2 (without fix, background_workers=4)
smoke ✅ PASS ✅ PASS
mega (1458 × 27) ✅ PASS ✅ PASS
traces-5k (5000 cascade) ✅ PASS (6m30s) ✅ PASS (7m16s)

Both configurations pass at QA-equivalent scale on staging. Option A's 5 → 2 reduction doesn't change pass/fail on the topologies tested.

Negative test: ran a separate same-dataset concurrent script (5 threads × 10 rounds × 5 items on ONE shared dataset) — reproduces OPIK-6600's symptoms reliably (76-80% data loss, 3/3 runs). Confirms our diagnostics + test infrastructure can catch the BE bug when same-dataset concurrency is present; confirms migrate alone doesn't produce that concurrency.

Pre-commit: ruff, ruff format, mypy → all pass.

Documentation

N/A — debug-only diagnostics, no new user-facing behavior. The single user-observable change (slightly slower experiment cascade for very large datasets due to single streamer consumer; staging traces-5k went from 7m16s to 6m30s — actually faster because of less BE contention, but local Docker doesn't reproduce that) is documented in the in-code comment at cli/migrate/main.py:_build_client.

…diagnostics

Forces ``OPIK_BACKGROUND_WORKERS=1`` for the duration of ``opik migrate``
so the streamer never has multiple in-flight HTTP writes against the
same workspace during the experiment cascade. Migrate's dataset writes
were already serialized on the main thread; this collapses the
trace/span/feedback streamer to a single consumer too, delivering the
"no manufactured workspace-level concurrency" property the ticket asked
for.

Adds DEBUG-gated diagnostics so a future failing run is grep-able:

* per-version source counts + post-apply re-fetch of destination
  (items_total, streamed count) with WARNING on mismatch in
  version_replay.py
* per-experiment cascade summary in experiments.py
* migrate-scoped httpx event hook (_debug_http.py) tagging every
  outbound request with thread id, start/end timestamps, elapsed ms,
  and a per-host in_flight counter; installed only when the opik
  logger is at DEBUG so production callers pay zero overhead

Adds a stress E2E (tests/e2e/cli/test_migrate_dataset_stress_e2e.py)
mirroring the QA repro shapes (qa-v1-stress-traces-5k:
100 items + 5000 cascade; qa-v1-stress-mega: 1458 items x 27 versions).
The test asserts exact counts at every layer (dataset_items_count,
per-version items_total, per-version stream, experiment items) so a
future regression that silently drops items would fail loudly. Marked
slow shapes with pytest.mark.slow.

Note on scope: Andrei's data-loss repro requires concurrent writers on
the same dataset_version chain (OPIK-6600) or a multi-replica CH
visibility window (OPIK-6601). Migrate by itself does not manufacture
dataset-endpoint concurrency -- confirmed via the new httpx
interceptor showing 63 dataset writes all at in_flight<=1 across a
mega-shape run. This PR is the SDK-side defensive hardening that
removes any future workspace-level cross-entity concurrency; the
user-visible data loss must still be addressed BE-side under
OPIK-6600/OPIK-6601.

Implements OPIK-6602: opik migrate must not produce concurrent
same-workspace writes from a single process.
@github-actions github-actions Bot added python Pull requests that update Python code tests Including test files, or tests related like configuration. Python SDK labels May 20, 2026
Comment on lines +339 to +346
tgt_dataset_row = rest.datasets.get_dataset_by_identifier(
dataset_name=dataset_name, project_name=target_project_name
)
tgt_versions = chronological_versions(rest, tgt_dataset_row.id)
assert len(tgt_versions) == num_versions, (
f"target version count {len(tgt_versions)} != source {num_versions}"
)
assert tgt_dataset_row.dataset_items_count == total_source_items, (
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use the shared E2E verifiers (verify_dataset/verify_experiment) instead of duplicating the dataset/experiment checks via raw REST reads (get_dataset_by_identifier + chronological_versions), since the custom path can be flaky under normal propagation/transient timing? If we still need per-version items_total/streamed-count, can we add a dedicated version-chain verifier (or push those assertions into a lower-level suite) rather than redoing the whole verification flow here?

Finding type: AI Coding Guidelines | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
sdks/python/tests/e2e/cli/test_migrate_dataset_stress_e2e.py around lines 339-370 inside
`test_migrate_dataset__stress__no_silent_data_loss`, replace the raw destination
verification logic that uses `rest.datasets.get_dataset_by_identifier` +
`chronological_versions` + direct `assert len(tgt_versions)` / `dataset_items_count`
with the shared E2E verifier(s) from `tests.e2e.verifiers` (use `verify_dataset` /
`verify_experiment` as appropriate). Refactor so the destination dataset/experiment
assertions use those helpers’ built-in retry/wait to handle propagation lag. Then
extract the remaining per-version checks (target vN `items_total` and per-version
streamed item count vs `src_items_per_version`) into a dedicated version-chain verifier
helper (or move those checks into an existing lower-level suite) that can be
retried/waited for each version_hash, rather than asserting immediately after the
migration.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping — the existing migrate E2E tests in this same folder (e.g. test_migrate_dataset_e2e.py, test_migrate_dataset_cross_project_cascade_e2e.py) all verify destinations via direct REST reads through the conftest helpers (chronological_versions, stream_items_wire, destination_experiment_items). Using shared verify_dataset/verify_experiment would either (a) introduce abstraction not used elsewhere in tests/e2e/cli/ or (b) hide the per-version items_total assertions that are the whole point of this stress test (mirroring the QA-failure shape). The "flakiness under propagation" concern doesn't apply here: migrate runs as a synchronous CLI subprocess via subprocess.run, so by the time run_migrate_cli() returns, every write has been flushed and the audit log written — there's no propagation race to wait out.

🤖 Reply posted via /address-github-pr-comments

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that makes sense. I’ll save this to memory once the PR is merged.

Comment on lines +359 to +363
for page_idx in (1, 2, 3):
page = rest_helpers.ensure_rest_api_call_respecting_rate_limit(
lambda p=page_idx: rest_client.datasets.list_dataset_versions(
id=dest_dataset_id, page=p, size=_VERSIONS_PAGE_SIZE
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_log_post_apply_verification only checks list_dataset_versions pages 1-3 (100/page), so in a write race the new_version_id can land on page 4+ and get a false NOT FOUND warning; can we keep paging until the id shows up or we hit an empty page?

Finding type: Logical Bugs | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
sdks/python/src/opik/cli/migrate/datasets/version_replay.py around lines 359-371 inside
`_log_post_apply_verification`, replace the fixed `for page_idx in (1, 2, 3)` pagination
with a loop that continues fetching subsequent pages until either the `new_version_id`
is found or the API returns an empty page (no content), so the diagnostic doesn’t
falsely warn under high-write races. Keep the rate-limit-aware retry wrapper
(`ensure_rest_api_call_respecting_rate_limit`) for each page request, and add a small
safety cap (e.g., max pages) if needed to prevent unbounded pagination. Update the
surrounding comment to reflect the new termination condition (found or empty page).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping — this is the diagnostic helper, not the production write path. It runs immediately after we wrote the version on the main thread, so the version we just wrote will always be on page 1 (newest-first ordering). The 3-page defense covers the rare case of another writer racing in within milliseconds; even there, 300 versions of buffer is well past realistic. Unbounded pagination here would be a bigger risk: this fires per-version on every migrate and would blow up on a workspace with thousands of versions. The current 3-page cap with a NOT FOUND warning is the right severity-vs-cost tradeoff for a diagnostic.

🤖 Reply posted via /address-github-pr-comments

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification — that diagnostic-only context and bounded pagination tradeoff makes sense here. I'll save this to memory once the PR is merged.

Comment on lines +422 to +426
except Exception as exc:
# Diagnostic must never break the migrate. Log at debug + move on.
LOGGER.debug(
"migrate.replay v%d/%d POST-APPLY verification failed: %s",
index + 1,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_log_post_apply_verification() catches broad except Exception and only debug-logs, suppressing real readback/verification failures (non-429 ApiErrors, parse/runtime errors, etc.) instead of surfacing them—should we narrow the catch to a recoverable type or re-raise after logging?

Finding type: AI Coding Guidelines | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
sdks/python/src/opik/cli/migrate/datasets/version_replay.py around lines 422-430, in the
`_log_post_apply_verification()` exception handler, stop catching generic `Exception`
and suppressing it with only a DEBUG log. Refactor so you either (a) catch only the
specific recoverable exception types that can happen during the post-write verification
(e.g., the rate-limit helper’s exhausted-retry / transient transport exceptions) or
(b) re-raise after logging for non-recoverable verification failures like API 4xx/5xx,
stream iteration/parsing errors, or unexpected runtime errors. Additionally, adjust the
call site in `replay_all_versions()` around lines 280-291 so that a verification failure
is not silently treated as success (e.g., have `_log_post_apply_verification()` return a
status and log WARNING / propagate the error depending on recoverability).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping — the broad catch is intentional and documented in the function's docstring at the bottom: "Diagnostic must never break the migrate. Log at debug + move on." This helper runs AFTER the apply call has returned successfully (the actual migration write succeeded). If the verification re-fetch itself transiently fails (network blip, BE 500 on a read endpoint, parsing edge case), re-raising would turn a successful migration into a hard failure for a check-after-the-fact. The current design lets the migrate succeed and surfaces verification failures only when they're informative (count mismatches at WARNING). Narrowing the catch to recoverable types only would still let unexpected failures abort the migrate, which is the wrong tradeoff for a diagnostic.

🤖 Reply posted via /address-github-pr-comments

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification — that makes sense for a post-apply diagnostic path. I'll remember this and save it to memory once the PR is merged.

Comment on lines +110 to +115
opik_hooks.add_httpx_client_hook(
opik_hooks.HttpxClientHook(
client_modifier=_modifier,
client_init_arguments=None,
)
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

install_if_debug() isn’t idempotent: each DEBUG call creates a new HttpxClientHook and add_httpx_client_hook() just appends to _httpx_client_hooks, so later opik.httpx_client.get() applies duplicate hooks and _on_request/_on_response get registered twice. Can we guard/dedupe before appending?

Finding type: Logical Bugs | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
sdks/python/src/opik/cli/migrate/_debug_http.py around lines 88-115, the
install_if_debug() function is not truly idempotent: it always creates a new
HttpxClientHook and calls opik_hooks.add_httpx_client_hook(), which appends to a global
hook list, so multiple calls register _on_request/_on_response twice and duplicate log
lines. Add a module-level guard (e.g., a global “_hook_installed” flag) or an
explicit dedupe check that detects whether an equivalent migrate debug hook has already
been added before appending. Ensure repeated calls are harmless and update the docstring
to match the real behavior.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6932701 — added a module-level _installed flag protected by _install_lock. Second and subsequent calls short-circuit before appending another HttpxClientHook. Verified locally: calling install_if_debug() three times produces exactly one hook registration in opik.hooks.httpx_client_hook._httpx_client_hooks.

🤖 Reply posted via /address-github-pr-comments

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit 6932701 addressed this comment by adding an _installed flag plus _install_lock around install_if_debug, so the hook registration now short-circuits after the first DEBUG call and prevents duplicate HttpxClientHook entries.

Adds a module-level _installed flag protected by _install_lock. The
SDK's add_httpx_client_hook only appends to a global list and
apply_httpx_client_hooks fans every entry over each new httpx.Client,
so a second registration would fire _on_request / _on_response twice
per request and corrupt the in_flight counter.

Concretely matters when the module is imported from a test process or
called more than once in any one Python session; in the migrate CLI
happy path it was already only called once at startup.

Addresses baz-reviewer comment on PR #6807 (_debug_http.py:115).
@JetoPistola JetoPistola marked this pull request as ready for review May 20, 2026 19:12
@JetoPistola JetoPistola requested a review from a team as a code owner May 20, 2026 19:12
@JetoPistola JetoPistola added the test-environment Deploy Opik adhoc environment label May 20, 2026
@github-actions
Copy link
Copy Markdown
Contributor

🔄 Test environment deployment process has started

Phase 1: Deploying base version 2.0.43-5411 (from main branch) if environment doesn't exist
Phase 2: Building new images from PR branch danield/OPIK-6602-migrate-avoid-concurrent-writes
Phase 3: Will deploy newly built version after build completes

You can monitor the progress here.

@CometActions
Copy link
Copy Markdown
Collaborator

Test environment is now available!

To configure additional Environment variables for your environment, run [Deploy Opik AdHoc Environment workflow] (https://github.com/comet-ml/comet-deployment/actions/workflows/deploy_opik_adhoc_env.yaml)

Access Information

The deployment has completed successfully and the version has been verified.

The stress E2E's seeding helpers call create_traces / create_spans /
create_experiment_items / create_or_update_dataset_items / apply_changes
directly against the raw Fern client. On local Docker that's fine
(no rate limits), but on staging the per-user general_events limit
trips at the 5000-trace cascade and the test crashes mid-seed with
HTTP 429 before migrate even starts.

Wraps every bulk seeding call with rest_helpers.ensure_rest_api_call_respecting_rate_limit
(the same helper version_replay.py uses) so the test is runnable on
rate-limited environments. The retry observes the BE's
ratelimit-reset header and sleeps accordingly.

No change to migrate code; this only affects the test's seed phase.
@JetoPistola
Copy link
Copy Markdown
Contributor Author

Update: staging A/B + framing correction

Tested OPIK-6602 against staging (https://staging.dev.comet.com/opik/api, multi-replica BE — the topology QA was on). Ran the full stress matrix with and without Option A:

Shape Run 1 (with fix, background_workers=1) Run 2 (without fix, background_workers=4)
smoke (60 items + 100 cascade) ✅ PASS ✅ PASS
mega (1458 items × 27 versions) ✅ PASS ✅ PASS
traces-5k (100 items + 5000 cascade) ✅ PASS (6m30s) ✅ PASS (7m16s)

Both pass at QA-equivalent scale on staging. Option A doesn't change pass/fail status on the topologies I've been able to test; the gap from 5 → 2 in-flight HTTP writes vs 5 → 1 reflected below didn't matter empirically.

Framing correction (thanks to the reviewer who flagged this): Option A reduces workspace-level in-flight HTTP writes from 5 → 2, not 5 → 1 as I'd originally implied. Streamer consumers go from 4 → 1, but the main thread can still overlap with the single remaining consumer. The "no concurrent writes" property is delivered only at moments when either main is idle or the streamer queue is empty — true much of the time, but not always.

To actually deliver 1-in-flight we'd need client.flush() boundaries between every main-thread REST phase, which is a much larger change with no demonstrated benefit on the BE topologies tested. Option A is therefore best understood as small defensive hardening, not a fix for the QA-reported data loss — which the DEBUG findings already established needs OPIK-6600 / OPIK-6601 BE-side.

What's in the PR (honest scope):

  1. background_workers=1 for migrate — reduces 5 → 2 in-flight (smaller win than originally claimed, but real)
  2. DEBUG-gated diagnostics — per-version source/target counts with WARNING on mismatch + httpx hook with in_flight counter + thread id. These have already caught + categorized real behavior during PR investigation
  3. Stress E2E — mirrors qa-v1-stress-traces-5k / qa-v1-stress-mega shapes; commit 2006b472b wraps seed calls with 429-aware retry so the test is runnable against rate-limited environments like staging

Updated reply marker: 🤖

Comment on lines +156 to +160
# Wrap bulk writes with the SDK's 429-aware retry helper so this test
# is runnable on rate-limited environments (staging enforces per-user
# general_events limits that local Docker does not).
for chunk in _chunked(trace_writes, _BULK_INSERT_CHUNK):
rest_helpers.ensure_rest_api_call_respecting_rate_limit(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping the bulk seed calls in ensure_rest_api_call_respecting_rate_limit() can make the test hang on sustained HTTP 429s since the helper has no max-attempt/overall timeout and keeps looping after sleeping. Can we bound retries or add a test-level timeout so it fails cleanly?

Finding type: Logical Bugs | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
sdks/python/tests/e2e/cli/test_migrate_dataset_stress_e2e.py around lines 156-160 (and
the other bulk seed wrappers in the same flow: lines ~165-181, 194-199, 247-253, and
268-276), the calls to rest_helpers.ensure_rest_api_call_respecting_rate_limit around
create_traces/create_spans/create_experiment/create_experiment_items/create_or_update_dataset_items/apply_changes
can loop forever on sustained HTTP 429s because the helper appears to lack a max-attempt
or overall timeout. Update the helper invocation (or add a small local bounded wrapper
in this test) to enforce a maximum retry count and/or an overall time budget per
operation, so the test raises a clear failure once the budget is exhausted rather than
hanging. Ensure the operation is retried only within that budget and that the failure
message includes the operation_name for easier debugging.

@CometActions
Copy link
Copy Markdown
Collaborator

🌙 Nightly cleanup: The test environment for this PR (pr-6807) has been cleaned up to free cluster resources. PVCs are preserved — re-deploy to restore the environment.

@CometActions CometActions removed the test-environment Deploy Opik adhoc environment label May 21, 2026
@CometActions
Copy link
Copy Markdown
Collaborator

🌙 Nightly cleanup: The test environment for this PR (pr-6807) has been cleaned up to free cluster resources. PVCs are preserved — re-deploy to restore the environment.

1 similar comment
@CometActions
Copy link
Copy Markdown
Collaborator

🌙 Nightly cleanup: The test environment for this PR (pr-6807) has been cleaned up to free cluster resources. PVCs are preserved — re-deploy to restore the environment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Python SDK python Pull requests that update Python code tests Including test files, or tests related like configuration.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants