[OPIK-6602] [SDK] feat: serialize migrate streamer + add concurrency diagnostics#6807
[OPIK-6602] [SDK] feat: serialize migrate streamer + add concurrency diagnostics#6807JetoPistola wants to merge 3 commits into
Conversation
…diagnostics Forces ``OPIK_BACKGROUND_WORKERS=1`` for the duration of ``opik migrate`` so the streamer never has multiple in-flight HTTP writes against the same workspace during the experiment cascade. Migrate's dataset writes were already serialized on the main thread; this collapses the trace/span/feedback streamer to a single consumer too, delivering the "no manufactured workspace-level concurrency" property the ticket asked for. Adds DEBUG-gated diagnostics so a future failing run is grep-able: * per-version source counts + post-apply re-fetch of destination (items_total, streamed count) with WARNING on mismatch in version_replay.py * per-experiment cascade summary in experiments.py * migrate-scoped httpx event hook (_debug_http.py) tagging every outbound request with thread id, start/end timestamps, elapsed ms, and a per-host in_flight counter; installed only when the opik logger is at DEBUG so production callers pay zero overhead Adds a stress E2E (tests/e2e/cli/test_migrate_dataset_stress_e2e.py) mirroring the QA repro shapes (qa-v1-stress-traces-5k: 100 items + 5000 cascade; qa-v1-stress-mega: 1458 items x 27 versions). The test asserts exact counts at every layer (dataset_items_count, per-version items_total, per-version stream, experiment items) so a future regression that silently drops items would fail loudly. Marked slow shapes with pytest.mark.slow. Note on scope: Andrei's data-loss repro requires concurrent writers on the same dataset_version chain (OPIK-6600) or a multi-replica CH visibility window (OPIK-6601). Migrate by itself does not manufacture dataset-endpoint concurrency -- confirmed via the new httpx interceptor showing 63 dataset writes all at in_flight<=1 across a mega-shape run. This PR is the SDK-side defensive hardening that removes any future workspace-level cross-entity concurrency; the user-visible data loss must still be addressed BE-side under OPIK-6600/OPIK-6601. Implements OPIK-6602: opik migrate must not produce concurrent same-workspace writes from a single process.
| tgt_dataset_row = rest.datasets.get_dataset_by_identifier( | ||
| dataset_name=dataset_name, project_name=target_project_name | ||
| ) | ||
| tgt_versions = chronological_versions(rest, tgt_dataset_row.id) | ||
| assert len(tgt_versions) == num_versions, ( | ||
| f"target version count {len(tgt_versions)} != source {num_versions}" | ||
| ) | ||
| assert tgt_dataset_row.dataset_items_count == total_source_items, ( |
There was a problem hiding this comment.
Should we use the shared E2E verifiers (verify_dataset/verify_experiment) instead of duplicating the dataset/experiment checks via raw REST reads (get_dataset_by_identifier + chronological_versions), since the custom path can be flaky under normal propagation/transient timing? If we still need per-version items_total/streamed-count, can we add a dedicated version-chain verifier (or push those assertions into a lower-level suite) rather than redoing the whole verification flow here?
Finding type: AI Coding Guidelines | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
sdks/python/tests/e2e/cli/test_migrate_dataset_stress_e2e.py around lines 339-370 inside
`test_migrate_dataset__stress__no_silent_data_loss`, replace the raw destination
verification logic that uses `rest.datasets.get_dataset_by_identifier` +
`chronological_versions` + direct `assert len(tgt_versions)` / `dataset_items_count`
with the shared E2E verifier(s) from `tests.e2e.verifiers` (use `verify_dataset` /
`verify_experiment` as appropriate). Refactor so the destination dataset/experiment
assertions use those helpers’ built-in retry/wait to handle propagation lag. Then
extract the remaining per-version checks (target vN `items_total` and per-version
streamed item count vs `src_items_per_version`) into a dedicated version-chain verifier
helper (or move those checks into an existing lower-level suite) that can be
retried/waited for each version_hash, rather than asserting immediately after the
migration.
There was a problem hiding this comment.
Skipping — the existing migrate E2E tests in this same folder (e.g. test_migrate_dataset_e2e.py, test_migrate_dataset_cross_project_cascade_e2e.py) all verify destinations via direct REST reads through the conftest helpers (chronological_versions, stream_items_wire, destination_experiment_items). Using shared verify_dataset/verify_experiment would either (a) introduce abstraction not used elsewhere in tests/e2e/cli/ or (b) hide the per-version items_total assertions that are the whole point of this stress test (mirroring the QA-failure shape). The "flakiness under propagation" concern doesn't apply here: migrate runs as a synchronous CLI subprocess via subprocess.run, so by the time run_migrate_cli() returns, every write has been flushed and the audit log written — there's no propagation race to wait out.
🤖 Reply posted via /address-github-pr-comments
There was a problem hiding this comment.
Thanks, that makes sense. I’ll save this to memory once the PR is merged.
| for page_idx in (1, 2, 3): | ||
| page = rest_helpers.ensure_rest_api_call_respecting_rate_limit( | ||
| lambda p=page_idx: rest_client.datasets.list_dataset_versions( | ||
| id=dest_dataset_id, page=p, size=_VERSIONS_PAGE_SIZE | ||
| ) |
There was a problem hiding this comment.
_log_post_apply_verification only checks list_dataset_versions pages 1-3 (100/page), so in a write race the new_version_id can land on page 4+ and get a false NOT FOUND warning; can we keep paging until the id shows up or we hit an empty page?
Finding type: Logical Bugs | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
sdks/python/src/opik/cli/migrate/datasets/version_replay.py around lines 359-371 inside
`_log_post_apply_verification`, replace the fixed `for page_idx in (1, 2, 3)` pagination
with a loop that continues fetching subsequent pages until either the `new_version_id`
is found or the API returns an empty page (no content), so the diagnostic doesn’t
falsely warn under high-write races. Keep the rate-limit-aware retry wrapper
(`ensure_rest_api_call_respecting_rate_limit`) for each page request, and add a small
safety cap (e.g., max pages) if needed to prevent unbounded pagination. Update the
surrounding comment to reflect the new termination condition (found or empty page).
There was a problem hiding this comment.
Skipping — this is the diagnostic helper, not the production write path. It runs immediately after we wrote the version on the main thread, so the version we just wrote will always be on page 1 (newest-first ordering). The 3-page defense covers the rare case of another writer racing in within milliseconds; even there, 300 versions of buffer is well past realistic. Unbounded pagination here would be a bigger risk: this fires per-version on every migrate and would blow up on a workspace with thousands of versions. The current 3-page cap with a NOT FOUND warning is the right severity-vs-cost tradeoff for a diagnostic.
🤖 Reply posted via /address-github-pr-comments
There was a problem hiding this comment.
Thanks for the clarification — that diagnostic-only context and bounded pagination tradeoff makes sense here. I'll save this to memory once the PR is merged.
| except Exception as exc: | ||
| # Diagnostic must never break the migrate. Log at debug + move on. | ||
| LOGGER.debug( | ||
| "migrate.replay v%d/%d POST-APPLY verification failed: %s", | ||
| index + 1, |
There was a problem hiding this comment.
_log_post_apply_verification() catches broad except Exception and only debug-logs, suppressing real readback/verification failures (non-429 ApiErrors, parse/runtime errors, etc.) instead of surfacing them—should we narrow the catch to a recoverable type or re-raise after logging?
Finding type: AI Coding Guidelines | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
sdks/python/src/opik/cli/migrate/datasets/version_replay.py around lines 422-430, in the
`_log_post_apply_verification()` exception handler, stop catching generic `Exception`
and suppressing it with only a DEBUG log. Refactor so you either (a) catch only the
specific recoverable exception types that can happen during the post-write verification
(e.g., the rate-limit helper’s exhausted-retry / transient transport exceptions) or
(b) re-raise after logging for non-recoverable verification failures like API 4xx/5xx,
stream iteration/parsing errors, or unexpected runtime errors. Additionally, adjust the
call site in `replay_all_versions()` around lines 280-291 so that a verification failure
is not silently treated as success (e.g., have `_log_post_apply_verification()` return a
status and log WARNING / propagate the error depending on recoverability).
There was a problem hiding this comment.
Skipping — the broad catch is intentional and documented in the function's docstring at the bottom: "Diagnostic must never break the migrate. Log at debug + move on." This helper runs AFTER the apply call has returned successfully (the actual migration write succeeded). If the verification re-fetch itself transiently fails (network blip, BE 500 on a read endpoint, parsing edge case), re-raising would turn a successful migration into a hard failure for a check-after-the-fact. The current design lets the migrate succeed and surfaces verification failures only when they're informative (count mismatches at WARNING). Narrowing the catch to recoverable types only would still let unexpected failures abort the migrate, which is the wrong tradeoff for a diagnostic.
🤖 Reply posted via /address-github-pr-comments
There was a problem hiding this comment.
Thanks for the clarification — that makes sense for a post-apply diagnostic path. I'll remember this and save it to memory once the PR is merged.
| opik_hooks.add_httpx_client_hook( | ||
| opik_hooks.HttpxClientHook( | ||
| client_modifier=_modifier, | ||
| client_init_arguments=None, | ||
| ) | ||
| ) |
There was a problem hiding this comment.
install_if_debug() isn’t idempotent: each DEBUG call creates a new HttpxClientHook and add_httpx_client_hook() just appends to _httpx_client_hooks, so later opik.httpx_client.get() applies duplicate hooks and _on_request/_on_response get registered twice. Can we guard/dedupe before appending?
Finding type: Logical Bugs | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
sdks/python/src/opik/cli/migrate/_debug_http.py around lines 88-115, the
install_if_debug() function is not truly idempotent: it always creates a new
HttpxClientHook and calls opik_hooks.add_httpx_client_hook(), which appends to a global
hook list, so multiple calls register _on_request/_on_response twice and duplicate log
lines. Add a module-level guard (e.g., a global “_hook_installed” flag) or an
explicit dedupe check that detects whether an equivalent migrate debug hook has already
been added before appending. Ensure repeated calls are harmless and update the docstring
to match the real behavior.
There was a problem hiding this comment.
Fixed in 6932701 — added a module-level _installed flag protected by _install_lock. Second and subsequent calls short-circuit before appending another HttpxClientHook. Verified locally: calling install_if_debug() three times produces exactly one hook registration in opik.hooks.httpx_client_hook._httpx_client_hooks.
🤖 Reply posted via /address-github-pr-comments
There was a problem hiding this comment.
Commit 6932701 addressed this comment by adding an _installed flag plus _install_lock around install_if_debug, so the hook registration now short-circuits after the first DEBUG call and prevents duplicate HttpxClientHook entries.
Adds a module-level _installed flag protected by _install_lock. The SDK's add_httpx_client_hook only appends to a global list and apply_httpx_client_hooks fans every entry over each new httpx.Client, so a second registration would fire _on_request / _on_response twice per request and corrupt the in_flight counter. Concretely matters when the module is imported from a test process or called more than once in any one Python session; in the migrate CLI happy path it was already only called once at startup. Addresses baz-reviewer comment on PR #6807 (_debug_http.py:115).
|
🔄 Test environment deployment process has started Phase 1: Deploying base version You can monitor the progress here. |
|
✅ Test environment is now available! To configure additional Environment variables for your environment, run [Deploy Opik AdHoc Environment workflow] (https://github.com/comet-ml/comet-deployment/actions/workflows/deploy_opik_adhoc_env.yaml) Access Information
The deployment has completed successfully and the version has been verified. |
The stress E2E's seeding helpers call create_traces / create_spans / create_experiment_items / create_or_update_dataset_items / apply_changes directly against the raw Fern client. On local Docker that's fine (no rate limits), but on staging the per-user general_events limit trips at the 5000-trace cascade and the test crashes mid-seed with HTTP 429 before migrate even starts. Wraps every bulk seeding call with rest_helpers.ensure_rest_api_call_respecting_rate_limit (the same helper version_replay.py uses) so the test is runnable on rate-limited environments. The retry observes the BE's ratelimit-reset header and sleeps accordingly. No change to migrate code; this only affects the test's seed phase.
|
Update: staging A/B + framing correction Tested OPIK-6602 against staging (
Both pass at QA-equivalent scale on staging. Option A doesn't change pass/fail status on the topologies I've been able to test; the gap from 5 → 2 in-flight HTTP writes vs 5 → 1 reflected below didn't matter empirically. Framing correction (thanks to the reviewer who flagged this): Option A reduces workspace-level in-flight HTTP writes from 5 → 2, not 5 → 1 as I'd originally implied. Streamer consumers go from 4 → 1, but the main thread can still overlap with the single remaining consumer. The "no concurrent writes" property is delivered only at moments when either main is idle or the streamer queue is empty — true much of the time, but not always. To actually deliver 1-in-flight we'd need What's in the PR (honest scope):
Updated reply marker: 🤖 |
| # Wrap bulk writes with the SDK's 429-aware retry helper so this test | ||
| # is runnable on rate-limited environments (staging enforces per-user | ||
| # general_events limits that local Docker does not). | ||
| for chunk in _chunked(trace_writes, _BULK_INSERT_CHUNK): | ||
| rest_helpers.ensure_rest_api_call_respecting_rate_limit( |
There was a problem hiding this comment.
Wrapping the bulk seed calls in ensure_rest_api_call_respecting_rate_limit() can make the test hang on sustained HTTP 429s since the helper has no max-attempt/overall timeout and keeps looping after sleeping. Can we bound retries or add a test-level timeout so it fails cleanly?
Finding type: Logical Bugs | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
sdks/python/tests/e2e/cli/test_migrate_dataset_stress_e2e.py around lines 156-160 (and
the other bulk seed wrappers in the same flow: lines ~165-181, 194-199, 247-253, and
268-276), the calls to rest_helpers.ensure_rest_api_call_respecting_rate_limit around
create_traces/create_spans/create_experiment/create_experiment_items/create_or_update_dataset_items/apply_changes
can loop forever on sustained HTTP 429s because the helper appears to lack a max-attempt
or overall timeout. Update the helper invocation (or add a small local bounded wrapper
in this test) to enforce a maximum retry count and/or an overall time budget per
operation, so the test raises a clear failure once the budget is exhausted rather than
hanging. Ensure the operation is retried only within that budget and that the failure
message includes the operation_name for easier debugging.
|
🌙 Nightly cleanup: The test environment for this PR ( |
|
🌙 Nightly cleanup: The test environment for this PR ( |
1 similar comment
|
🌙 Nightly cleanup: The test environment for this PR ( |
Details
opik migratenow forcesOPIK_BACKGROUND_WORKERS=1for the migrate-timeopik.Opik()client. This reduces workspace-level in-flight HTTP writes during the experiment cascade from up to 5 → 2 (4 streamer consumers + main thread → 1 streamer consumer + main thread). Migrate's dataset writes were already serialized on the main thread (raw Fern client); this collapses the trace/span/feedback streamer to a single consumer alongside it.Also adds DEBUG-gated diagnostics so a future failing run is grep-able:
items_total, streamed count) with WARNING on mismatch inversion_replay.pyexperiments.py(source items, trace ids, span/comment counts attempted)_debug_http.py) tagging every outbound request with thread id, start/end timestamps, elapsed ms, and a per-hostin_flightcounter; installed only when theopiklogger is at DEBUG so production callers pay zero overhead. Idempotent across repeat calls (review fix at6932701f1).Adds a stress E2E (
tests/e2e/cli/test_migrate_dataset_stress_e2e.py) mirroring the QA repro shapes (qa-v1-stress-traces-5k: 100 items + 5000 cascade;qa-v1-stress-mega: 1458 items × 27 versions). The test asserts exact counts at every layer (dataset_items_count, per-versionitems_total, per-version stream, experiment items) so a future regression that silently drops items fails loudly. Slow shapes marked withpytest.mark.slow. Seed bulk-writes wrap withensure_rest_api_call_respecting_rate_limit(commit2006b472b) so the test is runnable on rate-limited environments.Honest scope. The user-visible data loss QA reported requires concurrent writers on the same
dataset_versionchain (OPIK-6600) or a multi-replica ClickHouse visibility window (OPIK-6601). Migrate by itself does NOT manufacture dataset-endpoint concurrency — verified via the new httpx interceptor showing 63 dataset writes all atin_flight ≤ 1across a mega-shape run. Staging A/B (below) further confirms Option A doesn't change pass/fail status on the multi-replica BE topology QA used. This PR is defensive hardening + diagnostics + a regression net; the user-visible data loss must still be addressed BE-side under OPIK-6600 / OPIK-6601.Change checklist
Issues
AI-WATERMARK
AI-WATERMARK: yes
Testing
Tested on two BE topologies:
Local Opik backend (
http://localhost:5174/api, single-replica ClickHouse, defaultopik.sh --backendsetup):pytest tests/e2e/cli/test_migrate_dataset_stress_e2e.py -v→ 3/3 PASS in 57.7smigrate.http hook installedline fires; per-version POST-APPLY counts agree at every version; 39 of 40 dataset HTTP windows atin_flight=1, onein_flight=2(backgroundis-alive/ping)Staging (
https://staging.dev.comet.com/opik/api, multi-replica — the BE topology QA was on): ran the full stress matrix WITH and WITHOUT Option A to A/B the effect:background_workers=1)background_workers=4)smokemega(1458 × 27)traces-5k(5000 cascade)Both configurations pass at QA-equivalent scale on staging. Option A's
5 → 2reduction doesn't change pass/fail on the topologies tested.Negative test: ran a separate same-dataset concurrent script (5 threads × 10 rounds × 5 items on ONE shared dataset) — reproduces OPIK-6600's symptoms reliably (76-80% data loss, 3/3 runs). Confirms our diagnostics + test infrastructure can catch the BE bug when same-dataset concurrency is present; confirms migrate alone doesn't produce that concurrency.
Pre-commit: ruff, ruff format, mypy → all pass.
Documentation
N/A — debug-only diagnostics, no new user-facing behavior. The single user-observable change (slightly slower experiment cascade for very large datasets due to single streamer consumer; staging traces-5k went from 7m16s to 6m30s — actually faster because of less BE contention, but local Docker doesn't reproduce that) is documented in the in-code comment at
cli/migrate/main.py:_build_client.