Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Streaming physical writes for native executor #2992

Merged
merged 18 commits into from
Oct 31, 2024
28 changes: 28 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[dependencies]
common-buffer = {path = "src/common/buffer", default-features = false}
common-daft-config = {path = "src/common/daft-config", default-features = false}
common-display = {path = "src/common/display", default-features = false}
common-file-formats = {path = "src/common/file-formats", default-features = false}
Expand All @@ -21,12 +22,14 @@ daft-local-execution = {path = "src/daft-local-execution", default-features = fa
daft-micropartition = {path = "src/daft-micropartition", default-features = false}
daft-minhash = {path = "src/daft-minhash", default-features = false}
daft-parquet = {path = "src/daft-parquet", default-features = false}
daft-physical-plan = {path = "src/daft-physical-plan", default-features = false}
daft-plan = {path = "src/daft-plan", default-features = false}
daft-scan = {path = "src/daft-scan", default-features = false}
daft-scheduler = {path = "src/daft-scheduler", default-features = false}
daft-sql = {path = "src/daft-sql", default-features = false}
daft-stats = {path = "src/daft-stats", default-features = false}
daft-table = {path = "src/daft-table", default-features = false}
daft-writers = {path = "src/daft-writers", default-features = false}
lazy_static = {workspace = true}
log = {workspace = true}
lzma-sys = {version = "*", features = ["static"]}
Expand All @@ -48,6 +51,7 @@ python = [
"daft-json/python",
"daft-micropartition/python",
"daft-parquet/python",
"daft-physical-plan/python",
"daft-plan/python",
"daft-scan/python",
"daft-scheduler/python",
Expand All @@ -56,6 +60,7 @@ python = [
"daft-table/python",
"daft-functions/python",
"daft-functions-json/python",
"daft-writers/python",
"common-daft-config/python",
"common-system-info/python",
"common-display/python",
Expand Down Expand Up @@ -138,6 +143,7 @@ members = [
"src/daft-functions",
"src/daft-functions-json",
"src/daft-sql",
"src/daft-writers",
"src/hyperloglog"
]

Expand Down
152 changes: 152 additions & 0 deletions daft/io/writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import uuid
from abc import ABC, abstractmethod
from typing import Optional, Union

Check warning on line 3 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L1-L3

Added lines #L1 - L3 were not covered by tests

from daft.daft import IOConfig
from daft.dependencies import pa, pacsv, pq
from daft.filesystem import (

Check warning on line 7 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L5-L7

Added lines #L5 - L7 were not covered by tests
_resolve_paths_and_filesystem,
canonicalize_protocol,
get_protocol_from_path,
)
from daft.series import Series
from daft.table.micropartition import MicroPartition
from daft.table.partitioning import (

Check warning on line 14 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L12-L14

Added lines #L12 - L14 were not covered by tests
partition_strings_to_path,
partition_values_to_str_mapping,
)
from daft.table.table import Table

Check warning on line 18 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L18

Added line #L18 was not covered by tests


class FileWriterBase(ABC):
def __init__(

Check warning on line 22 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L21-L22

Added lines #L21 - L22 were not covered by tests
self,
root_dir: str,
file_idx: int,
file_format: str,
partition_values: Optional[Table] = None,
compression: Optional[str] = None,
io_config: Optional[IOConfig] = None,
default_partition_fallback: str = "__HIVE_DEFAULT_PARTITION__",
):
[self.resolved_path], self.fs = _resolve_paths_and_filesystem(root_dir, io_config=io_config)
protocol = get_protocol_from_path(root_dir)
canonicalized_protocol = canonicalize_protocol(protocol)
is_local_fs = canonicalized_protocol == "file"

Check warning on line 35 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L32-L35

Added lines #L32 - L35 were not covered by tests

self.file_name = f"{uuid.uuid4()}-{file_idx}.{file_format}"
self.partition_values = partition_values
if self.partition_values is not None:
partition_strings = {

Check warning on line 40 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L37-L40

Added lines #L37 - L40 were not covered by tests
key: values.to_pylist()[0]
for key, values in partition_values_to_str_mapping(self.partition_values).items()
}
self.dir_path = partition_strings_to_path(self.resolved_path, partition_strings, default_partition_fallback)

Check warning on line 44 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L44

Added line #L44 was not covered by tests
else:
self.dir_path = f"{self.resolved_path}"

Check warning on line 46 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L46

Added line #L46 was not covered by tests

self.full_path = f"{self.dir_path}/{self.file_name}"
if is_local_fs:
self.fs.create_dir(self.dir_path, recursive=True)

Check warning on line 50 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L48-L50

Added lines #L48 - L50 were not covered by tests

self.compression = compression if compression is not None else "none"
self.current_writer: Optional[Union[pq.ParquetWriter, pacsv.CSVWriter]] = None

Check warning on line 53 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L52-L53

Added lines #L52 - L53 were not covered by tests
colin-ho marked this conversation as resolved.
Show resolved Hide resolved

colin-ho marked this conversation as resolved.
Show resolved Hide resolved
@abstractmethod
def write(self, table: MicroPartition) -> None:

Check warning on line 56 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L55-L56

Added lines #L55 - L56 were not covered by tests
"""Write data to the file using the appropriate writer.

Args:
table: MicroPartition containing the data to be written.
"""
pass

Check warning on line 62 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L62

Added line #L62 was not covered by tests

@abstractmethod
def close(self) -> Table:

Check warning on line 65 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L64-L65

Added lines #L64 - L65 were not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we name this something like start_next_file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mirroring the above comment, the python file writers should not write after close

"""Close the writer and return metadata about the written file.

Returns:
Table containing metadata about the written file, including path and partition values.
"""
pass

Check warning on line 71 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L71

Added line #L71 was not covered by tests

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also have a finalize method rather than overloading close to start a next file and closing the last file

Copy link
Contributor Author

@colin-ho colin-ho Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was actually intending for these Python writers to be non rotating. i.e. no writing after closing. They should be given a unique file_idx for the file_name generation upon construction, and unique set of partition_values.

I will add assertions and some comments to document this behaviour


class ParquetFileWriter(FileWriterBase):
def __init__(

Check warning on line 75 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L74-L75

Added lines #L74 - L75 were not covered by tests
self,
root_dir: str,
file_idx: int,
partition_values: Optional[Table] = None,
compression: str = "none",
io_config: Optional[IOConfig] = None,
):
super().__init__(

Check warning on line 83 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L83

Added line #L83 was not covered by tests
root_dir=root_dir,
file_idx=file_idx,
file_format="parquet",
partition_values=partition_values,
compression=compression,
io_config=io_config,
)

def _create_writer(self, schema: pa.Schema) -> pq.ParquetWriter:
return pq.ParquetWriter(

Check warning on line 93 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L92-L93

Added lines #L92 - L93 were not covered by tests
self.full_path,
schema,
compression=self.compression,
use_compliant_nested_type=False,
filesystem=self.fs,
)

def write(self, table: MicroPartition) -> None:
if self.current_writer is None:
self.current_writer = self._create_writer(table.schema().to_pyarrow_schema())
self.current_writer.write_table(table.to_arrow())

Check warning on line 104 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L101-L104

Added lines #L101 - L104 were not covered by tests

def close(self) -> Table:
if self.current_writer is not None:
self.current_writer.close()

Check warning on line 108 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L106-L108

Added lines #L106 - L108 were not covered by tests

metadata = {"path": Series.from_pylist([self.full_path])}
if self.partition_values is not None:
for col_name in self.partition_values.column_names():
metadata[col_name] = self.partition_values.get_column(col_name)
return Table.from_pydict(metadata)

Check warning on line 114 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L110-L114

Added lines #L110 - L114 were not covered by tests


class CSVFileWriter(FileWriterBase):
def __init__(

Check warning on line 118 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L117-L118

Added lines #L117 - L118 were not covered by tests
self,
root_dir: str,
file_idx: int,
partition_values: Optional[Table] = None,
io_config: Optional[IOConfig] = None,
):
super().__init__(

Check warning on line 125 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L125

Added line #L125 was not covered by tests
root_dir=root_dir,
file_idx=file_idx,
file_format="csv",
partition_values=partition_values,
io_config=io_config,
)

def _create_writer(self, schema: pa.Schema) -> pacsv.CSVWriter:
return pacsv.CSVWriter(

Check warning on line 134 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L133-L134

Added lines #L133 - L134 were not covered by tests
self.full_path,
schema,
)

def write(self, table: MicroPartition) -> None:
if self.current_writer is None:
self.current_writer = self._create_writer(table.schema().to_pyarrow_schema())
self.current_writer.write_table(table.to_arrow())

Check warning on line 142 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L139-L142

Added lines #L139 - L142 were not covered by tests

def close(self) -> Table:
if self.current_writer is not None:
self.current_writer.close()

Check warning on line 146 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L144-L146

Added lines #L144 - L146 were not covered by tests

metadata = {"path": Series.from_pylist([self.full_path])}
if self.partition_values is not None:
for col_name in self.partition_values.column_names():
metadata[col_name] = self.partition_values.get_column(col_name)
return Table.from_pydict(metadata)

Check warning on line 152 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L148-L152

Added lines #L148 - L152 were not covered by tests
38 changes: 25 additions & 13 deletions daft/table/partitioning.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,42 @@
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union

from daft import Series
from daft.expressions import ExpressionsProjection
from daft.table.table import Table

from .micropartition import MicroPartition


def partition_strings_to_path(
root_path: str, parts: Dict[str, str], partition_null_fallback: str = "__HIVE_DEFAULT_PARTITION__"
root_path: str,
parts: Dict[str, str],
partition_null_fallback: str = "__HIVE_DEFAULT_PARTITION__",
) -> str:
keys = parts.keys()
values = [partition_null_fallback if value is None else value for value in parts.values()]
postfix = "/".join(f"{k}={v}" for k, v in zip(keys, values))
return f"{root_path}/{postfix}"


def partition_values_to_str_mapping(
partition_values: Union[MicroPartition, Table],
) -> Dict[str, Series]:
null_part = Series.from_pylist(
[None]
) # This is to ensure that the null values are replaced with the default_partition_fallback value
pkey_names = partition_values.column_names()

partition_strings = {}

for c in pkey_names:
column = partition_values.get_column(c)
string_names = column._to_str_values()
null_filled = column.is_null().if_else(null_part, string_names)
partition_strings[c] = null_filled

return partition_strings


class PartitionedTable:
def __init__(self, table: MicroPartition, partition_keys: Optional[ExpressionsProjection]):
self.table = table
Expand Down Expand Up @@ -56,20 +78,10 @@ def partition_values_str(self) -> Optional[MicroPartition]:

If the table is not partitioned, returns None.
"""
null_part = Series.from_pylist([None])
partition_values = self.partition_values()

if partition_values is None:
return None
else:
pkey_names = partition_values.column_names()

partition_strings = {}

for c in pkey_names:
column = partition_values.get_column(c)
string_names = column._to_str_values()
null_filled = column.is_null().if_else(null_part, string_names)
partition_strings[c] = null_filled

partition_strings = partition_values_to_str_mapping(partition_values)
return MicroPartition.from_pydict(partition_strings)
10 changes: 10 additions & 0 deletions src/common/buffer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[dependencies]
common-error = {path = "../error", default-features = false}

[lints]
workspace = true

[package]
edition = {workspace = true}
name = "common-buffer"
version = {workspace = true}
Loading
Loading