Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/gabriel/tasks/classify.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
warn_if_modality_mismatch,
)
from ..utils.logging import announce_prompt_rendering
from ..utils.file_utils import save_dataframe_with_fallback
from ._attribute_utils import load_persisted_attributes


Expand Down Expand Up @@ -499,7 +500,12 @@ async def run(
disagg_path = os.path.join(
self.cfg.save_dir, f"{base_name}_full_disaggregated.csv"
)
full_df.to_csv(disagg_path, index_label=index_cols)
save_dataframe_with_fallback(
full_df,
disagg_path,
index=True,
label="Classify",
)

# aggregate across runs using a minimum frequency threshold
def _min_freq(s: pd.Series) -> Optional[bool]:
Expand Down Expand Up @@ -552,7 +558,12 @@ def _min_freq(s: pd.Series) -> Optional[bool]:

result_to_save = result.copy()
result_to_save["predicted_classes"] = result_to_save["predicted_classes"].apply(json.dumps)
result_to_save.to_csv(out_path, index=False)
save_dataframe_with_fallback(
result_to_save,
out_path,
index=False,
label="Classify",
)

# keep raw response files for reference

Expand Down
3 changes: 2 additions & 1 deletion src/gabriel/tasks/codify.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
warn_if_modality_mismatch,
)
from ..utils.logging import announce_prompt_rendering
from ..utils.file_utils import save_dataframe_with_fallback


@dataclass
Expand Down Expand Up @@ -1018,7 +1019,7 @@ async def run(
]

output_path = os.path.join(self.cfg.save_dir, "coded_passages.csv")
df_proc.to_csv(output_path, index=False)
save_dataframe_with_fallback(df_proc, output_path, index=False, label="Codify")

if self.cfg.debug_print:
print(f"\n[DEBUG] Processing complete. Results saved to: {self.cfg.save_dir}")
Expand Down
3 changes: 2 additions & 1 deletion src/gabriel/tasks/debias.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def tqdm(iterable: Iterable, **_: Any) -> Iterable:
from .paraphrase import Paraphrase, ParaphraseConfig
from .rank import Rank, RankConfig
from .rate import Rate, RateConfig
from ..utils.file_utils import save_dataframe_with_fallback
try: # statsmodels is optional; fall back to a lightweight solver if missing
from ..utils.plot_utils import fit_ols as _fit_ols
from ..utils.plot_utils import regression_plot as _regression_plot
Expand Down Expand Up @@ -571,7 +572,7 @@ async def run(

results_df = df_master.reset_index(drop=True)
results_path = metadata["result_path"]
results_df.to_csv(results_path, index=False)
save_dataframe_with_fallback(results_df, results_path, index=False, label="Debias")
if self.cfg.verbose:
print(f"[Debias] Results saved to {results_path}")

Expand Down
3 changes: 2 additions & 1 deletion src/gabriel/tasks/deidentify.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ..utils import safest_json
from ..utils.openai_utils import get_all_responses
from ..utils.logging import announce_prompt_rendering
from ..utils.file_utils import save_dataframe_with_fallback


# ────────────────────────────
Expand Down Expand Up @@ -308,5 +309,5 @@ async def run(

df_proc["mapping"] = mappings_col
df_proc["deidentified_text"] = deidentified_texts
df_proc.to_csv(csv_path, index=False)
save_dataframe_with_fallback(df_proc, str(csv_path), index=False, label="Deidentify")
return df_proc
14 changes: 10 additions & 4 deletions src/gabriel/tasks/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
warn_if_modality_mismatch,
)
from ..utils.logging import announce_prompt_rendering
from ..utils.file_utils import save_dataframe_with_fallback
from ._attribute_utils import load_persisted_attributes


Expand Down Expand Up @@ -175,7 +176,7 @@ async def run(
result["entity_name"] = pd.NA
for attr in self.cfg.attributes.keys():
result[attr] = pd.NA
result.to_csv(out_path, index=False)
save_dataframe_with_fallback(result, out_path, index=False, label="Extract")
return result

attr_items = list(self.cfg.attributes.items())
Expand Down Expand Up @@ -358,7 +359,12 @@ async def run(
disagg_path = os.path.join(
self.cfg.save_dir, f"{base_name}_full_disaggregated.csv"
)
full_df.to_csv(disagg_path, index_label=["id", "entity_name", "run"])
save_dataframe_with_fallback(
full_df,
disagg_path,
index=True,
label="Extract",
)

def _pick_first(s: pd.Series) -> str:
for val in s.dropna():
Expand Down Expand Up @@ -403,7 +409,7 @@ def _pick_first(s: pd.Series) -> str:
final_order.extend(remaining)
result = result[final_order]

result.to_csv(out_path, index=False)
save_dataframe_with_fallback(result, out_path, index=False, label="Extract")

result = result.replace("unknown", pd.NA)

Expand All @@ -427,7 +433,7 @@ def _pick_first(s: pd.Series) -> str:
coerced[col] = conv
fail_logs[col] = int((non_null & conv.isna()).sum())
coerced_path = os.path.join(self.cfg.save_dir, f"{base_name}_cleaned_coerced.csv")
coerced.to_csv(coerced_path, index=False)
save_dataframe_with_fallback(coerced, coerced_path, index=False, label="Extract")
for col, n_fail in fail_logs.items():
print(f"[Extract] Failed to coerce {n_fail} values in column '{col}'.")
result = coerced
Expand Down
3 changes: 2 additions & 1 deletion src/gabriel/tasks/paraphrase.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
warn_if_modality_mismatch,
)
from ..utils.logging import announce_prompt_rendering
from ..utils.file_utils import save_dataframe_with_fallback

# Import classifier utilities for recursive validation. Importing from
# ``gabriel.tasks.classify`` does not introduce a circular dependency
Expand Down Expand Up @@ -294,7 +295,7 @@ async def run(
self.cfg.save_dir,
f"{os.path.splitext(self.cfg.file_name)[0]}_cleaned.csv",
)
df_proc.to_csv(out_path, index=False)
save_dataframe_with_fallback(df_proc, out_path, index=False, label="Paraphrase")
return df_proc

async def _recursive_validate(
Expand Down
10 changes: 8 additions & 2 deletions src/gabriel/tasks/rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
warn_if_modality_mismatch,
)
from ..utils.logging import announce_prompt_rendering
from ..utils.file_utils import save_dataframe_with_fallback
from ._attribute_utils import load_persisted_attributes


Expand Down Expand Up @@ -328,15 +329,20 @@ async def run(
disagg_path = os.path.join(
self.cfg.save_dir, f"{base_name}_full_disaggregated.csv"
)
full_df.to_csv(disagg_path, index_label=["id", "run"])
save_dataframe_with_fallback(
full_df,
disagg_path,
index=True,
label="Rate",
)

# aggregate across runs
agg_df = full_df.groupby("id")[list(self.cfg.attributes)].mean()

out_path = os.path.join(self.cfg.save_dir, f"{base_name}_cleaned.csv")
result = df_proc.merge(agg_df, left_on="_gid", right_index=True, how="left")
result = result.drop(columns=["_gid"])
result.to_csv(out_path, index=False)
save_dataframe_with_fallback(result, out_path, index=False, label="Rate")

# keep raw response files for reference

Expand Down
3 changes: 2 additions & 1 deletion src/gabriel/tasks/seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from gabriel.utils import safest_json
from gabriel.utils.openai_utils import get_all_responses
from gabriel.utils.logging import announce_prompt_rendering
from gabriel.utils.file_utils import save_dataframe_with_fallback


@dataclass
Expand Down Expand Up @@ -303,7 +304,7 @@ def _finalize_entities(self, entities: List[str]) -> pd.DataFrame:
df["source_batch"] = df.index // max(self.cfg.entities_per_generation, 1)
df["source_identifier"] = ["seed" for _ in range(len(entities))]
final_path = os.path.join(self.cfg.save_dir, self.cfg.file_name)
df.to_csv(final_path, index=False)
save_dataframe_with_fallback(df, final_path, index=False, label="Seed")
print(
f"[Seed] Generated {len(df)} entities. Saved aggregated seeds to {final_path}."
)
Expand Down
3 changes: 2 additions & 1 deletion src/gabriel/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
)
from .prompt_utils import swap_circle_square
from .modality_utils import warn_if_modality_mismatch
from .file_utils import load
from .file_utils import load, save_dataframe_with_fallback

__all__ = [
"get_response",
Expand Down Expand Up @@ -61,4 +61,5 @@
"swap_circle_square",
"warn_if_modality_mismatch",
"load",
"save_dataframe_with_fallback",
]
86 changes: 86 additions & 0 deletions src/gabriel/utils/file_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import importlib
import math
import os
from typing import Any, Dict, Iterable, List, Optional, Set

Expand Down Expand Up @@ -57,6 +58,91 @@
PDF_EXTENSION_SUFFIXES = {ext.lstrip(".") for ext in PDF_EXTENSIONS}


def save_dataframe_with_fallback(
df: pd.DataFrame,
path: str,
*,
index: bool = False,
chunk_size: int = 100_000,
label: Optional[str] = None,
) -> List[str]:
"""Persist ``df`` to CSV, falling back to chunked files on failure.

The function first attempts to save to ``path``. If that fails (for example
due to very large output files), it retries by splitting the DataFrame into
``chunk_size`` row chunks and writing ``<stem>_1.csv``, ``<stem>_2.csv``,
and so on. Failures are logged and surfaced via console messages, but never
raised, so callers can continue returning the in-memory DataFrame.

Returns
-------
list[str]
Paths that were successfully written. An empty list indicates all save
attempts failed.
"""

resolved_path = os.path.expandvars(os.path.expanduser(path))
prefix = f"[{label}] " if label else ""
try:
parent = os.path.dirname(resolved_path)
if parent:
os.makedirs(parent, exist_ok=True)
except Exception:
logger.debug("Could not ensure parent directory for %s", resolved_path)

try:
df.to_csv(resolved_path, index=index)
return [resolved_path]
except Exception as primary_exc:
print(
f"{prefix}Unable to save final CSV to {resolved_path}. "
"Trying chunked backup files (100k rows each)."
)
logger.warning(
"Failed to save CSV to %s; attempting chunked backup files.",
resolved_path,
exc_info=primary_exc,
)

try:
chunk = max(1, int(chunk_size))
except Exception:
chunk = 100_000
stem, ext = os.path.splitext(resolved_path)
ext = ext or ".csv"
total_rows = len(df)
part_count = max(1, int(math.ceil(total_rows / max(1, chunk))))
saved_paths: List[str] = []
try:
for idx in range(part_count):
start = idx * chunk
stop = start + chunk
part_path = f"{stem}_{idx + 1}{ext}"
df.iloc[start:stop].to_csv(part_path, index=index)
saved_paths.append(part_path)
except Exception as split_exc:
print(
f"{prefix}Final DataFrame could not be saved to disk. "
"Returning the DataFrame in memory only."
)
logger.warning(
"Chunked CSV fallback failed for %s.",
resolved_path,
exc_info=split_exc,
)
return []

print(
f"{prefix}Saved fallback split files ({part_count} part(s)) with base path {stem}_*.{ext.lstrip('.')}"
)
logger.warning(
"Saved chunked CSV fallback for %s into %d part(s).",
resolved_path,
part_count,
)
return saved_paths


def load(
folder_path: str,
extensions: Optional[Iterable[str]] = None,
Expand Down
Loading
Loading