Skip to content

Commit b31ae80

Browse files
Implementing to_iceberg
1 parent 1875fb5 commit b31ae80

File tree

3 files changed

+148
-7
lines changed

3 files changed

+148
-7
lines changed

pandas/core/frame.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3547,6 +3547,54 @@ def to_xml(
35473547

35483548
return xml_formatter.write_output()
35493549

3550+
def to_iceberg(
3551+
self,
3552+
table_identifier: str,
3553+
catalog_name: str | None = None,
3554+
*,
3555+
catalog_properties: dict[str, Any] | None = None,
3556+
location: str | None = None,
3557+
snapshot_properties: dict[str, str] | None = None,
3558+
):
3559+
"""
3560+
Write a DataFrame to an Apache Iceberg table.
3561+
3562+
.. versionadded:: 3.0.0
3563+
3564+
Parameters
3565+
----------
3566+
table_identifier : str
3567+
Table identifier.
3568+
catalog_name : str, optional
3569+
The name of the catalog.
3570+
catalog_properties : dict of {str: str}, optional
3571+
The properties that are used next to the catalog configuration.
3572+
location : str, optional
3573+
Location for the table.
3574+
snapshot_properties : dict of {str: str}, optional
3575+
Custom properties to be added to the snapshot summary
3576+
3577+
See Also
3578+
--------
3579+
read_iceberg : Read an Apache Iceberg table.
3580+
DataFrame.to_parquet : Write a DataFrame in Parquet format.
3581+
3582+
Examples
3583+
--------
3584+
>>> df = pd.DataFrame(data={"col1": [1, 2], "col2": [4, 3]})
3585+
>>> df.to_iceberg("my_table", catalog_name="my_catalog")
3586+
"""
3587+
from pandas.io.iceberg import to_iceberg
3588+
3589+
return to_iceberg(
3590+
self,
3591+
table_identifier,
3592+
catalog_name,
3593+
catalog_properties=catalog_properties,
3594+
location=location,
3595+
snapshot_properties=snapshot_properties,
3596+
)
3597+
35503598
# ----------------------------------------------------------------------
35513599
@doc(INFO_DOCSTRING, **frame_sub_kwargs)
35523600
def info(

pandas/io/iceberg.py

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,17 @@
77
from pandas import DataFrame
88

99

10+
def _get_catalog(catalog_name: str | None, catalog_properties: dict[str, Any] | None):
11+
pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog")
12+
if catalog_properties is None:
13+
catalog_properties = {}
14+
return pyiceberg_catalog.load_catalog(catalog_name, **catalog_properties)
15+
16+
1017
def read_iceberg(
1118
table_identifier: str,
1219
catalog_name: str | None = None,
20+
*,
1321
catalog_properties: dict[str, Any] | None = None,
1422
row_filter: str | None = None,
1523
selected_fields: tuple[str] | None = None,
@@ -69,12 +77,8 @@ def read_iceberg(
6977
... selected_fields=("VendorID", "tpep_pickup_datetime"),
7078
... ) # doctest: +SKIP
7179
"""
72-
pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog")
80+
catalog = _get_catalog(catalog_name, catalog_properties)
7381
pyiceberg_expressions = import_optional_dependency("pyiceberg.expressions")
74-
75-
if catalog_properties is None:
76-
catalog_properties = {}
77-
catalog = pyiceberg_catalog.load_catalog(catalog_name, **catalog_properties)
7882
table = catalog.load_table(table_identifier)
7983
if row_filter is None:
8084
row_filter = pyiceberg_expressions.AlwaysTrue()
@@ -91,3 +95,56 @@ def read_iceberg(
9195
limit=limit,
9296
)
9397
return result.to_pandas()
98+
99+
100+
def to_iceberg(
101+
df: DataFrame,
102+
table_identifier: str,
103+
catalog_name: str | None = None,
104+
*,
105+
catalog_properties: dict[str, Any] | None = None,
106+
location: str | None = None,
107+
snapshot_properties: dict[str, str] | None = None,
108+
):
109+
"""
110+
Write a DataFrame to an Apache Iceberg table.
111+
112+
.. versionadded:: 3.0.0
113+
114+
Parameters
115+
----------
116+
table_identifier : str
117+
Table identifier.
118+
catalog_name : str, optional
119+
The name of the catalog.
120+
catalog_properties : dict of {str: str}, optional
121+
The properties that are used next to the catalog configuration.
122+
location : str, optional
123+
Location for the table.
124+
snapshot_properties : dict of {str: str}, optional
125+
Custom properties to be added to the snapshot summary
126+
127+
See Also
128+
--------
129+
read_iceberg : Read an Apache Iceberg table.
130+
DataFrame.to_parquet : Write a DataFrame in Parquet format.
131+
132+
Examples
133+
--------
134+
>>> df = pd.DataFrame(data={"col1": [1, 2], "col2": [4, 3]})
135+
>>> df.to_iceberg("my_table", catalog_name="my_catalog")
136+
"""
137+
pa = import_optional_dependency("pyarrow")
138+
139+
catalog = _get_catalog(catalog_name, catalog_properties)
140+
arrow_table = pa.Table.from_pandas(df)
141+
table = catalog.create_table_if_not_exists(
142+
identifier=table_identifier,
143+
schema=arrow_table.schema,
144+
location=location,
145+
# we could add `partition_spec`, `sort_order` and `properties` in the
146+
# future, but it may not be trivial without exposing PyIceberg objects
147+
)
148+
if snapshot_properties is None:
149+
snapshot_properties = {}
150+
table.append(arrow_table, snapshot_properties=snapshot_properties)

pandas/tests/io/test_iceberg.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
pyiceberg_catalog = pytest.importorskip("pyiceberg.catalog")
2323
pq = pytest.importorskip("pyarrow.parquet")
2424

25-
Catalog = collections.namedtuple("Catalog", ["name", "uri"])
25+
Catalog = collections.namedtuple("Catalog", ["name", "uri", "warehouse"])
2626

2727

2828
@pytest.fixture
@@ -58,7 +58,7 @@ def catalog(request, tmp_path):
5858

5959
importlib.reload(pyiceberg_catalog) # needed to reload the config file
6060

61-
yield Catalog(name=catalog_name or "default", uri=uri)
61+
yield Catalog(name=catalog_name or "default", uri=uri, warehouse=warehouse)
6262

6363
if catalog_name is not None:
6464
config_path.unlink()
@@ -141,3 +141,39 @@ def test_read_with_limit(self, catalog):
141141
limit=2,
142142
)
143143
tm.assert_frame_equal(result, expected)
144+
145+
def test_write(self, catalog):
146+
df = pd.DataFrame(
147+
{
148+
"A": [1, 2, 3],
149+
"B": ["foo", "foo", "foo"],
150+
}
151+
)
152+
df.to_iceberg(
153+
"ns.new_table",
154+
catalog_properties={"uri": catalog.uri},
155+
location=catalog.warehouse,
156+
)
157+
result = read_iceberg(
158+
"ns.new_table",
159+
catalog_properties={"uri": catalog.uri},
160+
)
161+
tm.assert_frame_equal(result, df)
162+
163+
@pytest.mark.parametrize("catalog", ["default", "pandas_tests"], indirect=True)
164+
def test_write_by_catalog_name(self, catalog):
165+
df = pd.DataFrame(
166+
{
167+
"A": [1, 2, 3],
168+
"B": ["foo", "foo", "foo"],
169+
}
170+
)
171+
df.to_iceberg(
172+
"ns.new_table",
173+
catalog_name=catalog.name,
174+
)
175+
result = read_iceberg(
176+
"ns.new_table",
177+
catalog_name=catalog.name,
178+
)
179+
tm.assert_frame_equal(result, df)

0 commit comments

Comments
 (0)