Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion sdks/python/src/opik/cli/migrate/datasets/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,19 @@ def _on_inner_step(
# BEFORE ``apply_fn`` so the per-action ``ok`` record can't carry
# post-apply counters; a summary record after the cascade is the
# cleanest additive surface.
# Summary status flips to "failed" when any skip counter is non-zero so
# a programmatic consumer doing ``jq '.actions[] | select(.status ==
# "failed")'`` picks it up. Per-(experiment, reason) ``skip`` records
# emitted by the cascade itself carry ``status="skipped"`` and the
# affected source ids; this summary aggregates the totals (OPIK-6599).
has_skips = (
result.experiments_skipped
+ result.items_skipped_missing_trace
+ result.items_skipped_missing_item
) > 0
audit.record(
type="cascade_experiments_summary",
status="ok",
status="failed" if has_skips else "ok",
details={
"source_dataset_id": action.source_dataset_id,
"to_dataset": action.dest_name,
Expand Down
89 changes: 88 additions & 1 deletion sdks/python/src/opik/cli/migrate/datasets/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ def finish(self, label: str = "done") -> None:

_EXPERIMENT_PAGE_SIZE = 100

# Cap the per-record ``sample_source_ids`` list so a pathological
# all-items-missing case doesn't bloat the audit JSON. The count is
# always recorded in full; the sample is only there to give an operator
# enough breadcrumbs to investigate a few offending source ids.
_SKIP_SAMPLE_LIMIT = 20

# Buffer around the experiment's trace start/end times when bulk-fetching
# spans via ``search_spans(from_time, to_time)``. Late-arriving spans
# (the streamer is async; a span tied to a trace can land after the trace's
Expand Down Expand Up @@ -236,7 +242,6 @@ def cascade_experiments(
``{"id", "name", "reason"}`` entries for the audit log.
"""
result = ExperimentCascadeResult()
del audit # not currently used (umbrella action wraps via execute_plan_loop)

# Default to an empty remap so callers that haven't picked up the
# new kwarg (older tests, ad-hoc invocations) behave the same as
Expand Down Expand Up @@ -278,6 +283,7 @@ def cascade_experiments(
item_id_remap=item_id_remap,
optimization_id_remap=optimization_id_remap,
result=result,
audit=audit,
inner_progress_callback=inner_progress_callback,
)

Expand All @@ -298,6 +304,7 @@ def cascade_one_experiment(
item_id_remap: Dict[str, str],
optimization_id_remap: Optional[Dict[str, str]] = None,
result: ExperimentCascadeResult,
audit: Optional[AuditLog] = None,
inner_progress_callback: Optional[InnerProgressCallback] = None,
) -> None:
"""Migrate one source experiment: read items -> copy traces + spans ->
Expand Down Expand Up @@ -449,16 +456,96 @@ def cascade_one_experiment(
"reason": "recreate_experiment returned False",
}
)
_record_skip(
audit,
reason="experiment_recreate_returned_false",
experiment_id=experiment_id,
experiment_name=source_experiment.name,
count=1,
sample_source_ids=[experiment_id],
)

# Tally per-item skips visible after the recreate call. ``recreate_experiment``
# prints its own skip counts but doesn't return them; we infer the two
# mapping-miss totals by comparing source items against the remap entries
# so the cascade-level audit counters stay accurate.
#
# Per-(experiment, reason) audit records are emitted at the end with
# the affected source ids (capped at ``_SKIP_SAMPLE_LIMIT``) so the
# CLI can fail loud with a machine-readable breakdown -- see OPIK-6599.
# Cap the per-reason sample lists during collection -- ``_record_skip``
# would slice them anyway, but trimming early bounds peak memory in
# the pathological case (e.g. 10k items all missing the same remap).
# ``count`` comes from the always-fully-incremented counters so the
# audit record carries the true total even when the sample is capped.
missing_trace_count = 0
missing_item_count = 0
missing_trace_sample: List[str] = []
missing_item_sample: List[str] = []
for item in items:
if item.trace_id and item.trace_id not in result.trace_id_remap:
result.items_skipped_missing_trace += 1
missing_trace_count += 1
if len(missing_trace_sample) < _SKIP_SAMPLE_LIMIT:
missing_trace_sample.append(item.trace_id)
if item.dataset_item_id and item.dataset_item_id not in item_id_remap:
result.items_skipped_missing_item += 1
Comment on lines +472 to 492
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing_trace_ids/missing_item_ids grow unbounded before being trimmed in _record_skip — should we cap them at _SKIP_SAMPLE_LIMIT during collection? Also, should we add a test for the missing-trace negative path asserting a skip audit with reason == "items_missing_trace_remap" and the expected count/sample?

Finding types: Reliable test patterns Batch and stream | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents
Two related improvements in `cascade_one_experiment` (experiments.py ~lines 472-503): 1.
Cap sample lists during collection: instead of appending every missing id and trimming
later in `_record_skip`, append only while `len(missing_trace_ids) < _SKIP_SAMPLE_LIMIT`
(same for `missing_item_ids`). Track total miss counts independently (or via
already-incremented counters like `result.items_skipped_missing_trace`) and pass those
as `count=` to `_record_skip`. 2. In `test_migrate_dataset_experiments_cascade.py`, add
a test for the missing-trace negative path: simulate an experiment cascade where
`dataset_item_id` remap is present but `trace_id` is NOT in `result.trace_id_remap`.
Assert the audit log receives exactly one `skip` record with `details.reason ==
"items_missing_trace_remap"`, `details.count` equals the expected number of missing
trace ids, and `details.sample_source_ids` contains the expected offending trace ids
(respecting `_SKIP_SAMPLE_LIMIT`).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit e624694 addressed this comment by capping the missing-trace/item sample lists as they are collected and passing the total counts to the new _record_skip helper so the audit record keeps accurate counts with a bounded sample. It also emits explicit skip audit records for the missing-trace and missing-item reasons, including the newly added _SKIP_SAMPLE_LIMIT. The accompanying tests now cover the missing-trace negative path, asserting the audit reason, count, and sampled source IDs are as expected.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the unbounded growth. Fixed in e624694missing_trace_sample / missing_item_sample are now capped at _SKIP_SAMPLE_LIMIT during collection (the appends are guarded by len(...) < _SKIP_SAMPLE_LIMIT). count continues to come from the always-fully-incremented result.items_skipped_* counters, so the audit record still carries the true total even when the sample is capped.

Also added the missing-trace negative-path test you asked for: TestSkipAuditRecords.test_missing_trace_remap__emits_skip_record. It monkey-patches _copy_traces_and_spans so the trace_id_remap is deliberately incomplete after the trace-copy phase (mirroring a production BE quirk where a trace gets dropped mid-cascade), then asserts the cascade emits exactly one skip record with reason == "items_missing_trace_remap", the right count, and the orphan trace_id in sample_source_ids.

missing_item_count += 1
if len(missing_item_sample) < _SKIP_SAMPLE_LIMIT:
missing_item_sample.append(item.dataset_item_id)

if missing_trace_count:
_record_skip(
audit,
reason="items_missing_trace_remap",
experiment_id=experiment_id,
experiment_name=source_experiment.name,
count=missing_trace_count,
sample_source_ids=missing_trace_sample,
)
if missing_item_count:
_record_skip(
audit,
reason="items_missing_dataset_item_remap",
experiment_id=experiment_id,
experiment_name=source_experiment.name,
count=missing_item_count,
sample_source_ids=missing_item_sample,
)


def _record_skip(
audit: Optional[AuditLog],
*,
reason: str,
experiment_id: str,
experiment_name: Optional[str],
count: int,
sample_source_ids: List[str],
) -> None:
"""Append a per-(experiment, reason) ``skip`` record to the audit log.

Sample ids are capped at ``_SKIP_SAMPLE_LIMIT`` so a pathological skip
(e.g. 10k items losing their dataset_item_id remap) doesn't balloon
the audit JSON. ``count`` is always the full population so a
machine-readable consumer can sum across records.

No-op when ``audit`` is ``None`` — keeps tests and ad-hoc invocations
that don't pass an audit log working as before.
"""
if audit is None:
return
audit.record(
type="skip",
status="skipped",
details={
"reason": reason,
"experiment_id": experiment_id,
"experiment_name": experiment_name,
"count": count,
"sample_source_ids": sample_source_ids[:_SKIP_SAMPLE_LIMIT],
},
)


def _list_source_experiments(
Expand Down
105 changes: 98 additions & 7 deletions sdks/python/src/opik/cli/migrate/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import sys
import time
from pathlib import Path
from typing import Any, Iterator, Optional
from typing import Any, Dict, Iterator, Optional

import click
from rich.console import Console
Expand Down Expand Up @@ -48,6 +48,11 @@
)

console = Console()
# Dedicated stderr console for the loud-fail path so the SKIP_SUMMARY
# line lands on stderr without flipping the default console (OPIK-6599).
# Tests assert against this stream; CI gates can grep stderr without
# parsing the audit JSON.
_stderr_console = Console(stderr=True)

MIGRATE_CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"]}

Expand Down Expand Up @@ -185,6 +190,91 @@ def _finalize_and_fail(
sys.exit(1)


def _finalize_with_skips_or_ok(
audit: AuditLog,
audit_path: Path,
name: str,
target_label: str,
target_project: str,
elapsed_seconds: float,
) -> None:
"""Finalize the audit log, then either fail loud on skips or print the
happy-path message.

Per OPIK-6599: when the cascade emits any ``skip`` audit record, the
migrate is "succeeded but lossy" — the destination state has partial
data and is **not** rolled back. We finalize the audit to ``failed``,
print a SKIP_SUMMARY line to **stderr** so CI pipelines can grep
without parsing the JSON, and exit non-zero. Operators rely on the
audit log to know what made it across.
"""
skip_records = [
action for action in audit.actions if action.get("status") == "skipped"
]
if not skip_records:
audit.finalize("ok")
audit.write(audit_path)
elapsed = _format_elapsed(elapsed_seconds)
console.print(
f"[green]Migrated '{name}' into project '{target_project}' as "
f"'{target_label}'.[/green] Took {elapsed}. Audit log: {audit_path}"
)
return
Comment on lines +211 to +222
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SKIP_SUMMARY reads record.get("reason", ""), but _record_skip puts reason under details, so reason_to_total_key.get(...) ends up None and totals/total_skipped stay at 0; should we pull reason and count from record.get("details", {}).get("reason", "") and record.get("details", {}).get("count", 0)?

Finding type: Validate nullable inputs | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
sdks/python/src/opik/cli/migrate/main.py around lines 211-242 inside
`_finalize_with_skips_or_ok`, fix the SKIP_SUMMARY aggregation logic so it reads skip
classification from the actual audit record shape. Update the loop to extract `reason`
and `count` from `record.get('details', {})` (e.g., `details.get('reason','')` and
`details.get('count',0)`) instead of `record.get('reason', '')`, and then use those
values to increment `totals` so `total_skipped` reflects real skips. Ensure the existing
mapping in `reason_to_total_key` and the final stderr messages stay the same, but now
compute correct counts when skip records are present.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified this against the audit-record shape and the bot is mistaken here. AuditLog.record(details=...) in audit.py:51-52 flattens details into the entry via entry.update(details), so record["reason"] and record["count"] are on the top level, not nested under details. Quick repro:

>>> from opik.cli.migrate.audit import AuditLog
>>> a = AuditLog(command='test', args={})
>>> a.record(type='skip', status='skipped', details={'reason': 'r', 'count': 42})
>>> a.actions[0]
{'type': 'skip', 'status': 'skipped', 'recorded_at': '...', 'reason': 'r', 'count': 42}

The existing test_multiple_skip_records__totals_aggregated_by_reason test asserts the aggregation produces items_skipped_missing_trace=14 and items_skipped_missing_item=6 from skip records carrying count=7 and count=3 respectively — it would fail with =0 =0 if the read sites were broken. So the current code is correct as written.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the correction — you're right, I misread the audit record shape here. I'll save this to memory once the PR is merged.


# Aggregate counts by reason for the SKIP_SUMMARY line. The cascade
# summary record carries the totals too, but reading from skip records
# directly keeps the message decoupled from the summary record's shape.
totals: Dict[str, int] = {
"experiments_skipped": 0,
"items_skipped_missing_trace": 0,
"items_skipped_missing_item": 0,
}
reason_to_total_key = {
"experiment_recreate_returned_false": "experiments_skipped",
"items_missing_trace_remap": "items_skipped_missing_trace",
"items_missing_dataset_item_remap": "items_skipped_missing_item",
}
Comment on lines +233 to +236
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The skip-aggregation loop reads record.get("reason")/record.get("count"), but _record_skip stores them under details, so total_key ends up None and the SKIP_SUMMARY/total_skipped stays 0 even when skips exist—should we read record.get("details", {})["reason"|"count"] or move those fields to the top level?

Finding type: Centralize error handling | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
sdks/python/src/opik/cli/migrate/main.py around lines 193-260, inside the
_finalize_with_skips_or_ok function where it builds totals from skip_records, fix the
aggregation to read reason and count from the correct location. The current code uses
record.get("reason") and record.get("count"), but skip records created elsewhere store
those fields under record["details"] (e.g., details={"reason": ..., "count": ...}).
Refactor the loop so it pulls details = record.get("details", {}) and then uses
details.get("reason", "") and int(details.get("count", 0)) when incrementing totals, so
SKIP_SUMMARY and total_skipped accurately reflect real skips. If there are tests
asserting the SKIP_SUMMARY fields, update them or add a targeted test to cover this
mismatch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as my reply on the sibling thread above — false positive. AuditLog.record(details=...) flattens details into the entry (audit.py:51-52), so top-level record.get("reason") / record.get("count") is correct. Verified with the test_multiple_skip_records__totals_aggregated_by_reason test which asserts non-zero aggregated totals.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the correction — that makes sense. I’ll save this to memory once the PR is merged.

for record in skip_records:
total_key = reason_to_total_key.get(record.get("reason", ""))
if total_key is None:
continue
totals[total_key] += int(record.get("count", 0))

total_skipped = sum(totals.values())
audit.finalize("failed")
audit.write(audit_path)

elapsed = _format_elapsed(elapsed_seconds)
_stderr_console.print(
f"[red]opik migrate: {total_skipped} item{'s' if total_skipped != 1 else ''} "
f"skipped — destination state was NOT rolled back; see audit log: "
f"{audit_path}[/red]"
)
_stderr_console.print(
f"SKIP_SUMMARY: "
f"experiments_skipped={totals['experiments_skipped']} "
f"items_skipped_missing_trace={totals['items_skipped_missing_trace']} "
f"items_skipped_missing_item={totals['items_skipped_missing_item']}"
)
# High-level rollback hint. We deliberately don't ship a step-by-step
# CLI playbook here -- the audit log is the source of truth for what
# was actually created. Each ``ok`` action in ``audit.actions`` carries
# the destination entity id; an operator can grep the audit JSON to
# see exactly what landed in the destination project before deciding
# what to delete. Auto-rollback (a one-flag clean reverse) is tracked
# as a follow-up; this PR is the loud-fail mechanic only.
_stderr_console.print(
f"[yellow]To roll back manually: in project "
f"'{target_project}', delete the destination dataset "
f"'{target_label}' along with any experiments, optimizations, "
f"traces, and spans that were cascaded into it (the audit log "
Comment on lines +259 to +270
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rollback hint is printed via _stderr_console.print (stderr), but CI guidance that captures only stdout through tee … won’t include it—should we emit it on stdout instead (e.g., print or a stdout console)?

Finding type: Prefer standard logging | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
sdks/python/src/opik/cli/migrate/main.py around lines 259-273 where the “High-level
rollback hint” is emitted via `_stderr_console.print`, change it to write to stdout
instead. Replace `_stderr_console` with the stdout console (or use `print`) so CI
pipelines that only capture stdout via `tee` will include the rollback instructions in
summaries/PR bodies. Keep the message content and formatting the same; only the output
stream should change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping — the rollback hint deliberately lives on stderr with the rest of the failure block:

  1. opik migrate: N items skipped — destination state was NOT rolled back; see audit log: <path> (stderr, red)
  2. SKIP_SUMMARY: ... (stderr, machine-parseable)
  3. The rollback hint (stderr, yellow)
  4. (after Xs) (stderr, dim)

These four lines are one cohesive failure summary. Splitting just line 3 off to stdout would scatter the failure narrative across two streams.

CI that captures both streams via 2>&1 (or just redirects stderr via 2>) keeps the hint. CI that captures only stdout via tee was already missing the loud-fail message, the SKIP_SUMMARY line, and the exit-code context — its operators wouldn't even know the migrate failed, so missing the rollback hint isn't the load-bearing gap. Fixing that CI's capture is the right escalation; moving one line to stdout would only paper over it.

🤖 Reply posted via /address-github-pr-comments

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that makes sense — keeping the rollback hint on stderr preserves the single failure-summary block here. I’ll save this to memory once the PR is merged.

f"lists each created entity id); then rename the source "
f"'{name}_v1' back to '{name}'.[/yellow]"
)
_stderr_console.print(f"[dim](after {elapsed})[/dim]")
sys.exit(1)


@migrate_group.command(name="dataset")
@click.argument("name", type=str)
@click.option(
Expand Down Expand Up @@ -289,12 +379,13 @@ def migrate_dataset_command(
elapsed_seconds=time.monotonic() - started_at,
)

audit.finalize("ok")
audit.write(audit_path)
elapsed = _format_elapsed(time.monotonic() - started_at)
console.print(
f"[green]Migrated '{name}' into project '{to_project}' as '{plan.target_name}'.[/green] "
f"Took {elapsed}. Audit log: {audit_path}"
_finalize_with_skips_or_ok(
audit,
audit_path,
name=name,
target_label=plan.target_name,
target_project=to_project,
elapsed_seconds=time.monotonic() - started_at,
)


Expand Down
Loading
Loading