Dev#1499
Conversation
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ead, write, metadata ops
…ta-rs API surface - New write() signature with mode, schema_mode, configuration, predicate, partition_by, storage_options, name, description as keyword-only params - Deprecated if_exists param with DeprecationWarning - Unified Polars/Pandas/Arrow paths through write_deltalake - Wrapped blocking write_deltalake in asyncio.to_thread() - Removed deprecated engine="rust" parameter - Forward **kwargs for advanced options (writer_properties, etc.) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…h new write interface - Added schema_mode, configuration params to create() - Wrapped write_deltalake in asyncio.to_thread() - Added Polars->Arrow normalization - Fixed Path(str)->Path(path) and Path(str)->Path(data) bugs - Removed engine="rust", omit None values from args Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…a write interface 19 tests covering: all 4 write modes, Pandas/Polars/Arrow input, schema_mode merge/overwrite, configuration forwarding, predicate targeted overwrite, if_exists deprecation warning, partition_by, storage_options fallback, name/description metadata, and create() with schema_mode and configuration. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Added test_write_new_api() demonstrating: mode overwrite, schema_mode merge, configuration metadata, predicate targeted overwrite, name/ description, backward-compatible if_exists, and partition_by. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
spec status changed to approved by linter
Merged new-driver branch with all FEAT-003 changes: - TASK-011: Refactored write() with full delta-rs API surface - TASK-012: Refactored create() method - TASK-013: Unit tests (19/19 passing) - TASK-014: Example script updates All 4 tasks verified and closed.
Reviewer's GuideRefactors the Delta driver’s create/write APIs to be thin async wrappers around delta-rs write_deltalake with expanded parameter support, adds comprehensive tests and specs for the new behavior, updates examples, and records completion of several SDD tasks; also adds a few local helper scripts and workspace artifacts. Sequence diagram for the refactored DeltaDriver.write async flowsequenceDiagram
actor Caller
participant DeltaDriver
participant AsyncIO as asyncio.to_thread
participant DeltaRS as write_deltalake
Caller->>DeltaDriver: write(data, table_id, path, mode, schema_mode, partition_by, configuration, storage_options, predicate, name, description, if_exists, **kwargs)
activate DeltaDriver
alt if_exists is not None
DeltaDriver->>DeltaDriver: warnings.warn("if_exists is deprecated")
DeltaDriver->>DeltaDriver: mode = if_exists
end
DeltaDriver->>DeltaDriver: if isinstance(data, pl.DataFrame): data = data.to_arrow()
DeltaDriver->>DeltaDriver: destination = Path(path) / table_id
DeltaDriver->>DeltaDriver: build args dict (mode, schema_mode, partition_by, configuration, predicate, name, description, storage_options or self.storage_options, **kwargs) excluding None
DeltaDriver->>AsyncIO: to_thread(write_deltalake, destination, data, **args)
activate AsyncIO
AsyncIO->>DeltaRS: write_deltalake(destination, data, **args)
activate DeltaRS
DeltaRS-->>AsyncIO: success or raises DeltaError/DeltaProtocolError
deactivate DeltaRS
AsyncIO-->>DeltaDriver: result or exception
deactivate AsyncIO
alt success
DeltaDriver->>DeltaDriver: self._delta = destination
DeltaDriver-->>Caller: return None
else DeltaError or DeltaProtocolError
DeltaDriver->>DeltaDriver: wrap in DriverError
DeltaDriver-->>Caller: raise DriverError
else other Exception
DeltaDriver->>DeltaDriver: wrap in DriverError
DeltaDriver-->>Caller: raise DriverError
end
deactivate DeltaDriver
Sequence diagram for the refactored DeltaDriver.create async flowsequenceDiagram
actor Caller
participant DeltaDriver
participant FS as FileSystem
participant AsyncIO as asyncio.to_thread
participant DeltaRS as write_deltalake
Caller->>DeltaDriver: create(path, data, name, mode, schema_mode, configuration, **kwargs)
activate DeltaDriver
alt path is str
DeltaDriver->>DeltaDriver: path = Path(path).resolve()
end
alt data is str (file path)
DeltaDriver->>DeltaDriver: data = Path(data).resolve()
end
alt data is Path (file)
DeltaDriver->>FS: read file based on suffix (csv, xls/xlsx, parquet)
FS-->>DeltaDriver: data as DataFrame or Table
end
DeltaDriver->>DeltaDriver: if isinstance(data, pl.DataFrame): data = data.to_arrow()
DeltaDriver->>DeltaDriver: build args dict (mode, name, schema_mode, configuration, **kwargs) excluding None
DeltaDriver->>AsyncIO: to_thread(write_deltalake, path, data, **args)
activate AsyncIO
AsyncIO->>DeltaRS: write_deltalake(path, data, **args)
activate DeltaRS
DeltaRS-->>AsyncIO: success or raises DeltaError
deactivate DeltaRS
AsyncIO-->>DeltaDriver: result or exception
deactivate AsyncIO
alt success
DeltaDriver-->>Caller: return None
else DeltaError
DeltaDriver->>DeltaDriver: wrap in DriverError
DeltaDriver-->>Caller: raise DriverError
else other Exception
DeltaDriver->>DeltaDriver: wrap in DriverError
DeltaDriver-->>Caller: raise DriverError
end
deactivate DeltaDriver
Updated class diagram for the Delta driver create and write interfacesclassDiagram
class DeltaDriver {
+dict storage_options
+Path _delta
+async create(path, data, name, mode, schema_mode, configuration, **kwargs) None
+async write(data, table_id, path, mode, schema_mode, partition_by, configuration, storage_options, predicate, name, description, if_exists, **kwargs) None
}
class write_deltalake {
<<function>>
+call(path, data, mode, schema_mode, partition_by, configuration, storage_options, predicate, name, description, **kwargs)
}
class DeltaError {
<<exception>>
}
class DeltaProtocolError {
<<exception>>
}
class DriverError {
<<exception>>
}
class asyncio_to_thread {
<<function>>
+call(func, arg1, arg2, **kwargs)
}
class pandas_DataFrame {
<<data>>
}
class polars_DataFrame {
<<data>>
+to_arrow() Table
}
class pyarrow_Table {
<<data>>
}
DeltaDriver ..> write_deltalake : uses
DeltaDriver ..> DeltaError : handles
DeltaDriver ..> DeltaProtocolError : handles
DeltaDriver ..> DriverError : raises
DeltaDriver ..> asyncio_to_thread : wraps call
DeltaDriver ..> pandas_DataFrame : accepts
DeltaDriver ..> polars_DataFrame : accepts
DeltaDriver ..> pyarrow_Table : accepts
polars_DataFrame --> pyarrow_Table : to_arrow() conversion
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 4 issues, and left some high level feedback:
- The new
write()implementation unconditionally letsif_existsoverridemode; this can surprise callers who explicitly passmode—consider only honoringif_existswhenmodeis still at its default to match the spec’s intent. - The
examples/test_elastic.pyfile contains hard-coded production-like host and password values; these should be removed or replaced with obvious placeholders before merging to avoid credential leakage. - Several non-source artifacts appear to have been added (e.g.
activities.csv,stores.csv,global-bundle.pem, workspace files,libraries/python-driver,.claude/worktrees/*); please confirm these are intended to live in the repo and otherwise drop or gitignore them.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The new `write()` implementation unconditionally lets `if_exists` override `mode`; this can surprise callers who explicitly pass `mode`—consider only honoring `if_exists` when `mode` is still at its default to match the spec’s intent.
- The `examples/test_elastic.py` file contains hard-coded production-like host and password values; these should be removed or replaced with obvious placeholders before merging to avoid credential leakage.
- Several non-source artifacts appear to have been added (e.g. `activities.csv`, `stores.csv`, `global-bundle.pem`, workspace files, `libraries/python-driver`, `.claude/worktrees/*`); please confirm these are intended to live in the repo and otherwise drop or gitignore them.
## Individual Comments
### Comment 1
<location path="asyncdb/drivers/delta.py" line_range="165-167" />
<code_context>
if isinstance(data, str):
- data = Path(str).resolve()
+ data = Path(data).resolve()
if isinstance(data, Path):
- # open this file with Pandas or Arrow
+ # Read file into Arrow/Pandas based on extension
ext = data.suffix
if ext == ".csv":
read_options = pcsv.ReadOptions()
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider explicitly handling or rejecting unsupported file extensions when `data` is a `Path`.
For `Path` inputs you only special-case `.csv`, Excel, and `.parquet`; for anything else, the `Path` is passed straight to `write_deltalake`, which expects a DataFrame/Arrow object. Please either raise a clear error for unsupported extensions or validate and document the supported set here rather than silently forwarding the `Path`.
</issue_to_address>
### Comment 2
<location path="asyncdb/drivers/delta.py" line_range="451-457" />
<code_context>
"""
- args = {"mode": if_exists, "engine": "rust", **kwargs}
+ # Handle deprecated if_exists parameter
+ if if_exists is not None:
+ warnings.warn(
+ "if_exists is deprecated, use mode instead",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ mode = if_exists
+
+ # Normalize Polars DataFrame to Arrow for a unified code path
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Deprecated `if_exists` is passed directly to `mode` without validation.
Because `mode` is now typed as `Literal[...]`, assigning `if_exists` directly can allow legacy callers to pass unexpected strings that bypass this contract and flow into `write_deltalake`. Please validate/normalize `if_exists` against the allowed mode values and raise early on invalid input before assigning it to `mode`.
Suggested implementation:
```python
from pathlib import Path
from typing import Literal, cast
```
```python
# Handle deprecated if_exists parameter
if if_exists is not None:
warnings.warn(
"if_exists is deprecated, use mode instead",
DeprecationWarning,
stacklevel=2,
)
# Normalize and validate legacy ``if_exists`` values against supported modes
allowed_modes: set[str] = {"error", "append", "overwrite", "ignore", "merge"}
normalized_if_exists = str(if_exists).lower()
if normalized_if_exists not in allowed_modes:
raise ValueError(
f"Invalid value for deprecated argument 'if_exists': {if_exists!r}. "
f"Expected one of: {sorted(allowed_modes)}."
)
mode = cast(
Literal["error", "append", "overwrite", "ignore", "merge"],
normalized_if_exists,
)
```
1. If `asyncdb/drivers/delta.py` already imports `Literal` from `typing`, adjust the import edit so you only add `cast` instead of re-importing `Literal`, for example:
`from typing import Literal, cast` → or extend the existing `from typing import ...` line.
2. Ensure the `mode` parameter’s `Literal[...]` annotation (in the function signature you didn’t show) matches the set `{"error", "append", "overwrite", "ignore", "merge"}`. If your project uses a different subset or naming, update both `allowed_modes` and the `Literal[...]` in the cast to the exact set used elsewhere.
</issue_to_address>
### Comment 3
<location path="examples/tst_rst_convert.py" line_range="7-12" />
<code_context>
+import pandas as pd
+import rst_convert
+
+credentials = {
+ "user": "troc_pgdata",
+ "password": "12345678",
+ "host": "127.0.0.1",
+ "port": "5432",
+ "database": "navigator"
+}
+
</code_context>
<issue_to_address>
**🚨 issue (security):** Avoid committing hard-coded database credentials in example scripts.
This example uses literal user/password/host/database values. Even if these are non-sensitive now, it normalizes committing credentials and raises the risk of real ones being pushed. Please load connection details from environment variables or a config file with safe defaults instead of embedding them directly in code.
</issue_to_address>
### Comment 4
<location path="tests/test_delta_write.py" line_range="56-65" />
<code_context>
+# --- Backward compatibility: if_exists deprecation ---
+
+
+@pytest.mark.asyncio
+async def test_write_if_exists_deprecated(driver, sample_pandas_df, tmp_path):
+ """write() with if_exists emits DeprecationWarning and still works."""
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter("always")
+ await driver.write(sample_pandas_df, "t1", tmp_path, if_exists="append")
+ dep_warnings = [x for x in w if issubclass(x.category, DeprecationWarning)]
+ assert len(dep_warnings) == 1
+ assert "if_exists is deprecated" in str(dep_warnings[0].message)
+
+ dt = DeltaTable(str(tmp_path / "t1"))
+ assert dt.to_pandas().shape[0] == 3
+
+
+# --- create() tests ---
+
+
+@pytest.mark.asyncio
+async def test_create_pandas(driver, sample_pandas_df, tmp_path):
+ """create() creates a Delta table from a Pandas DataFrame."""
+ dest = tmp_path / "new_table"
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for create() using file-based inputs and string paths to cover the new path-handling logic.
The refactored `create()` now normalizes string `path` values and branches on file-type when `data` is a `str`/`Path` (CSV/Excel/Parquet). Current tests only cover in-memory inputs and `Path` destinations. Please add tests that:
1. Use `path` as a string and verify the resulting Delta table is created correctly.
2. Use `data` as a CSV file path written to `tmp_path` and assert the resulting table contents.
3. Optionally repeat for Parquet and one Excel format (e.g. `.xlsx`) to ensure the correct readers are invoked and the new branches are covered.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| if isinstance(data, Path): | ||
| # open this file with Pandas or Arrow | ||
| # Read file into Arrow/Pandas based on extension | ||
| ext = data.suffix |
There was a problem hiding this comment.
suggestion (bug_risk): Consider explicitly handling or rejecting unsupported file extensions when data is a Path.
For Path inputs you only special-case .csv, Excel, and .parquet; for anything else, the Path is passed straight to write_deltalake, which expects a DataFrame/Arrow object. Please either raise a clear error for unsupported extensions or validate and document the supported set here rather than silently forwarding the Path.
| if if_exists is not None: | ||
| warnings.warn( | ||
| "if_exists is deprecated, use mode instead", | ||
| DeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
| mode = if_exists |
There was a problem hiding this comment.
suggestion (bug_risk): Deprecated if_exists is passed directly to mode without validation.
Because mode is now typed as Literal[...], assigning if_exists directly can allow legacy callers to pass unexpected strings that bypass this contract and flow into write_deltalake. Please validate/normalize if_exists against the allowed mode values and raise early on invalid input before assigning it to mode.
Suggested implementation:
from pathlib import Path
from typing import Literal, cast # Handle deprecated if_exists parameter
if if_exists is not None:
warnings.warn(
"if_exists is deprecated, use mode instead",
DeprecationWarning,
stacklevel=2,
)
# Normalize and validate legacy ``if_exists`` values against supported modes
allowed_modes: set[str] = {"error", "append", "overwrite", "ignore", "merge"}
normalized_if_exists = str(if_exists).lower()
if normalized_if_exists not in allowed_modes:
raise ValueError(
f"Invalid value for deprecated argument 'if_exists': {if_exists!r}. "
f"Expected one of: {sorted(allowed_modes)}."
)
mode = cast(
Literal["error", "append", "overwrite", "ignore", "merge"],
normalized_if_exists,
)- If
asyncdb/drivers/delta.pyalready importsLiteralfromtyping, adjust the import edit so you only addcastinstead of re-importingLiteral, for example:
from typing import Literal, cast→ or extend the existingfrom typing import ...line. - Ensure the
modeparameter’sLiteral[...]annotation (in the function signature you didn’t show) matches the set{"error", "append", "overwrite", "ignore", "merge"}. If your project uses a different subset or naming, update bothallowed_modesand theLiteral[...]in the cast to the exact set used elsewhere.
| credentials = { | ||
| "user": "troc_pgdata", | ||
| "password": "12345678", | ||
| "host": "127.0.0.1", | ||
| "port": "5432", | ||
| "database": "navigator" |
There was a problem hiding this comment.
🚨 issue (security): Avoid committing hard-coded database credentials in example scripts.
This example uses literal user/password/host/database values. Even if these are non-sensitive now, it normalizes committing credentials and raises the risk of real ones being pushed. Please load connection details from environment variables or a config file with safe defaults instead of embedding them directly in code.
| @pytest.mark.asyncio | ||
| async def test_write_append(driver, sample_pandas_df, tmp_path): | ||
| """write() with mode='append' creates table then appends rows.""" | ||
| await driver.write(sample_pandas_df, "t1", tmp_path, mode="append") | ||
| await driver.write(sample_pandas_df, "t1", tmp_path, mode="append") | ||
| dt = DeltaTable(str(tmp_path / "t1")) | ||
| result = dt.to_pandas() | ||
| assert len(result) == 6 | ||
|
|
||
|
|
There was a problem hiding this comment.
suggestion (testing): Add tests for create() using file-based inputs and string paths to cover the new path-handling logic.
The refactored create() now normalizes string path values and branches on file-type when data is a str/Path (CSV/Excel/Parquet). Current tests only cover in-memory inputs and Path destinations. Please add tests that:
- Use
pathas a string and verify the resulting Delta table is created correctly. - Use
dataas a CSV file path written totmp_pathand assert the resulting table contents. - Optionally repeat for Parquet and one Excel format (e.g.
.xlsx) to ensure the correct readers are invoked and the new branches are covered.
Summary by Sourcery
Refactor the Delta driver’s create/write APIs to fully wrap delta-rs, add comprehensive tests and updated examples, and record the work in SDD specs and completed task docs.
New Features:
Enhancements:
Tests: