Skip to content

Commit 18c27f2

Browse files
committed
feat: Add support for rest scan planning
1 parent 1b69a25 commit 18c27f2

File tree

6 files changed

+870
-158
lines changed

6 files changed

+870
-158
lines changed

pyiceberg/catalog/rest/__init__.py

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
from collections import deque
1718
from enum import Enum
1819
from typing import (
1920
TYPE_CHECKING,
2021
Any,
2122
Union,
2223
)
2324

24-
from pydantic import ConfigDict, Field, field_validator
25+
from pydantic import ConfigDict, Field, TypeAdapter, field_validator
2526
from requests import HTTPError, Session
2627
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt
2728

@@ -36,6 +37,16 @@
3637
)
3738
from pyiceberg.catalog.rest.auth import AuthManager, AuthManagerAdapter, AuthManagerFactory, LegacyOAuth2AuthManager
3839
from pyiceberg.catalog.rest.response import _handle_non_200_response
40+
from pyiceberg.catalog.rest.scan_planning import (
41+
FetchScanTasksRequest,
42+
PlanCancelled,
43+
PlanCompleted,
44+
PlanFailed,
45+
PlanningResponse,
46+
PlanSubmitted,
47+
PlanTableScanRequest,
48+
ScanTasks,
49+
)
3950
from pyiceberg.exceptions import (
4051
AuthorizationExpiredError,
4152
CommitFailedException,
@@ -44,6 +55,7 @@
4455
NamespaceNotEmptyError,
4556
NoSuchIdentifierError,
4657
NoSuchNamespaceError,
58+
NoSuchPlanTaskError,
4759
NoSuchTableError,
4860
NoSuchViewError,
4961
TableAlreadyExistsError,
@@ -56,6 +68,7 @@
5668
CommitTableRequest,
5769
CommitTableResponse,
5870
CreateTableTransaction,
71+
FileScanTask,
5972
StagedTable,
6073
Table,
6174
TableIdentifier,
@@ -315,6 +328,9 @@ class ListViewsResponse(IcebergBaseModel):
315328
identifiers: list[ListViewResponseEntry] = Field()
316329

317330

331+
_PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse)
332+
333+
318334
class RestCatalog(Catalog):
319335
uri: str
320336
_session: Session
@@ -384,6 +400,107 @@ def is_rest_scan_planning_enabled(self) -> bool:
384400
self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT
385401
)
386402

403+
@retry(**_RETRY_ARGS)
404+
def _plan_table_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) -> PlanningResponse:
405+
"""Submit a scan plan request to the REST server.
406+
407+
Args:
408+
identifier: Table identifier.
409+
request: The scan plan request parameters.
410+
411+
Returns:
412+
PlanningResponse the result of the scan plan request representing the status
413+
Raises:
414+
NoSuchTableError: If a table with the given identifier does not exist.
415+
"""
416+
self._check_endpoint(Capability.V1_SUBMIT_TABLE_SCAN_PLAN)
417+
response = self._session.post(
418+
self.url(Endpoints.plan_table_scan, prefixed=True, **self._split_identifier_for_path(identifier)),
419+
data=request.model_dump_json(by_alias=True, exclude_none=True).encode(UTF8),
420+
)
421+
try:
422+
response.raise_for_status()
423+
except HTTPError as exc:
424+
_handle_non_200_response(exc, {404: NoSuchTableError})
425+
426+
return _PLANNING_RESPONSE_ADAPTER.validate_json(response.text)
427+
428+
@retry(**_RETRY_ARGS)
429+
def _fetch_scan_tasks(self, identifier: str | Identifier, plan_task: str) -> ScanTasks:
430+
"""Fetch additional scan tasks using a plan task token.
431+
432+
Args:
433+
identifier: Table identifier.
434+
plan_task: The plan task token from a previous response.
435+
436+
Returns:
437+
ScanTasks containing file scan tasks and possibly more plan-task tokens.
438+
439+
Raises:
440+
NoSuchPlanTaskError: If a plan task with the given identifier or task does not exist.
441+
"""
442+
self._check_endpoint(Capability.V1_TABLE_SCAN_PLAN_TASKS)
443+
request = FetchScanTasksRequest(plan_task=plan_task)
444+
response = self._session.post(
445+
self.url(Endpoints.fetch_scan_tasks, prefixed=True, **self._split_identifier_for_path(identifier)),
446+
data=request.model_dump_json(by_alias=True).encode(UTF8),
447+
)
448+
try:
449+
response.raise_for_status()
450+
except HTTPError as exc:
451+
_handle_non_200_response(exc, {404: NoSuchPlanTaskError})
452+
453+
return ScanTasks.model_validate_json(response.text)
454+
455+
def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) -> list[FileScanTask]:
456+
"""Plan a table scan and return FileScanTasks.
457+
458+
Handles the full scan planning lifecycle including pagination.
459+
460+
Args:
461+
identifier: Table identifier.
462+
request: The scan plan request parameters.
463+
464+
Returns:
465+
List of FileScanTask objects ready for execution.
466+
467+
Raises:
468+
RuntimeError: If planning fails, is cancelled, or returns unexpected response.
469+
NotImplementedError: If async planning is required but not yet supported.
470+
"""
471+
response = self._plan_table_scan(identifier, request)
472+
473+
if isinstance(response, PlanFailed):
474+
error_msg = response.error.message if response.error else "unknown error"
475+
raise RuntimeError(f"Received status: failed: {error_msg}")
476+
477+
if isinstance(response, PlanCancelled):
478+
raise RuntimeError("Received status: cancelled")
479+
480+
if isinstance(response, PlanSubmitted):
481+
# TODO: implement polling for async planning
482+
raise NotImplementedError(f"Async scan planning not yet supported for planId: {response.plan_id}")
483+
484+
if not isinstance(response, PlanCompleted):
485+
raise RuntimeError(f"Invalid planStatus for response: {type(response).__name__}")
486+
487+
tasks: list[FileScanTask] = []
488+
489+
# Collect tasks from initial response
490+
for task in response.file_scan_tasks:
491+
tasks.append(FileScanTask.from_rest_response(task, response.delete_files))
492+
493+
# Fetch and collect from additional batches
494+
pending_tasks = deque(response.plan_tasks)
495+
while pending_tasks:
496+
plan_task = pending_tasks.popleft()
497+
batch = self._fetch_scan_tasks(identifier, plan_task)
498+
for task in batch.file_scan_tasks:
499+
tasks.append(FileScanTask.from_rest_response(task, batch.delete_files))
500+
pending_tasks.extend(batch.plan_tasks)
501+
502+
return tasks
503+
387504
def _create_legacy_oauth2_auth_manager(self, session: Session) -> AuthManager:
388505
"""Create the LegacyOAuth2AuthManager by fetching required properties.
389506

pyiceberg/catalog/rest/scan_planning.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,16 @@
2525

2626
from pyiceberg.catalog.rest.response import ErrorResponseMessage
2727
from pyiceberg.expressions import BooleanExpression, SerializableBooleanExpression
28-
from pyiceberg.manifest import FileFormat
28+
from pyiceberg.manifest import DataFileContent, FileFormat
2929
from pyiceberg.typedef import IcebergBaseModel
3030

31+
# REST content-type to DataFileContent
32+
CONTENT_TYPE_MAP: dict[str, DataFileContent] = {
33+
"data": DataFileContent.DATA,
34+
"position-deletes": DataFileContent.POSITION_DELETES,
35+
"equality-deletes": DataFileContent.EQUALITY_DELETES,
36+
}
37+
3138
# Primitive types that can appear in partition values and bounds
3239
PrimitiveTypeValue: TypeAlias = bool | int | float | str | Decimal | UUID | date | time | datetime | bytes
3340

pyiceberg/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ class NoSuchNamespaceError(Exception):
5252
"""Raised when a referenced name-space is not found."""
5353

5454

55+
class NoSuchPlanTaskError(Exception):
56+
"""Raised when a scan plan task is not found."""
57+
58+
5559
class RESTError(Exception):
5660
"""Raises when there is an unknown response from the REST Catalog."""
5761

pyiceberg/table/__init__.py

Lines changed: 123 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@
145145
from pyiceberg_core.datafusion import IcebergDataFusionTable
146146

147147
from pyiceberg.catalog import Catalog
148+
from pyiceberg.catalog.rest.scan_planning import (
149+
RESTContentFile,
150+
RESTDeleteFile,
151+
RESTFileScanTask,
152+
)
148153

149154
ALWAYS_TRUE = AlwaysTrue()
150155
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"
@@ -1168,6 +1173,8 @@ def scan(
11681173
snapshot_id=snapshot_id,
11691174
options=options,
11701175
limit=limit,
1176+
catalog=self.catalog,
1177+
table_identifier=self._identifier,
11711178
)
11721179

11731180
@property
@@ -1684,6 +1691,8 @@ class TableScan(ABC):
16841691
snapshot_id: int | None
16851692
options: Properties
16861693
limit: int | None
1694+
catalog: Catalog | None
1695+
table_identifier: Identifier | None
16871696

16881697
def __init__(
16891698
self,
@@ -1695,6 +1704,8 @@ def __init__(
16951704
snapshot_id: int | None = None,
16961705
options: Properties = EMPTY_DICT,
16971706
limit: int | None = None,
1707+
catalog: Catalog | None = None,
1708+
table_identifier: Identifier | None = None,
16981709
):
16991710
self.table_metadata = table_metadata
17001711
self.io = io
@@ -1704,6 +1715,8 @@ def __init__(
17041715
self.snapshot_id = snapshot_id
17051716
self.options = options
17061717
self.limit = limit
1718+
self.catalog = catalog
1719+
self.table_identifier = table_identifier
17071720

17081721
def snapshot(self) -> Snapshot | None:
17091722
if self.snapshot_id:
@@ -1798,6 +1811,74 @@ def __init__(
17981811
self.delete_files = delete_files or set()
17991812
self.residual = residual
18001813

1814+
@staticmethod
1815+
def from_rest_response(
1816+
rest_task: RESTFileScanTask,
1817+
delete_files: list[RESTDeleteFile],
1818+
) -> FileScanTask:
1819+
"""Convert a RESTFileScanTask to a FileScanTask.
1820+
1821+
Args:
1822+
rest_task: The REST file scan task.
1823+
delete_files: The list of delete files from the ScanTasks response.
1824+
1825+
Returns:
1826+
A FileScanTask with the converted data and delete files.
1827+
1828+
Raises:
1829+
NotImplementedError: If equality delete files are encountered.
1830+
"""
1831+
from pyiceberg.catalog.rest.scan_planning import RESTEqualityDeleteFile
1832+
1833+
data_file = _rest_file_to_data_file(rest_task.data_file)
1834+
1835+
resolved_deletes: set[DataFile] = set()
1836+
if rest_task.delete_file_references:
1837+
for idx in rest_task.delete_file_references:
1838+
delete_file = delete_files[idx]
1839+
if isinstance(delete_file, RESTEqualityDeleteFile):
1840+
raise NotImplementedError(f"PyIceberg does not yet support equality deletes: {delete_file.file_path}")
1841+
resolved_deletes.add(_rest_file_to_data_file(delete_file))
1842+
1843+
return FileScanTask(
1844+
data_file=data_file,
1845+
delete_files=resolved_deletes,
1846+
residual=rest_task.residual_filter if rest_task.residual_filter else ALWAYS_TRUE,
1847+
)
1848+
1849+
1850+
def _rest_file_to_data_file(rest_file: RESTContentFile) -> DataFile:
1851+
"""Convert a REST content file to a manifest DataFile."""
1852+
from pyiceberg.catalog.rest.scan_planning import CONTENT_TYPE_MAP, RESTDataFile
1853+
1854+
if isinstance(rest_file, RESTDataFile):
1855+
column_sizes = rest_file.column_sizes.to_dict() if rest_file.column_sizes else None
1856+
value_counts = rest_file.value_counts.to_dict() if rest_file.value_counts else None
1857+
null_value_counts = rest_file.null_value_counts.to_dict() if rest_file.null_value_counts else None
1858+
nan_value_counts = rest_file.nan_value_counts.to_dict() if rest_file.nan_value_counts else None
1859+
else:
1860+
column_sizes = None
1861+
value_counts = None
1862+
null_value_counts = None
1863+
nan_value_counts = None
1864+
1865+
data_file = DataFile.from_args(
1866+
content=CONTENT_TYPE_MAP[rest_file.content],
1867+
file_path=rest_file.file_path,
1868+
file_format=rest_file.file_format,
1869+
partition=Record(*rest_file.partition) if rest_file.partition else Record(),
1870+
record_count=rest_file.record_count,
1871+
file_size_in_bytes=rest_file.file_size_in_bytes,
1872+
column_sizes=column_sizes,
1873+
value_counts=value_counts,
1874+
null_value_counts=null_value_counts,
1875+
nan_value_counts=nan_value_counts,
1876+
split_offsets=rest_file.split_offsets,
1877+
sort_order_id=rest_file.sort_order_id,
1878+
)
1879+
data_file.spec_id = rest_file.spec_id
1880+
return data_file
1881+
18011882

18021883
def _open_manifest(
18031884
io: FileIO,
@@ -1970,12 +2051,35 @@ def scan_plan_helper(self) -> Iterator[list[ManifestEntry]]:
19702051
],
19712052
)
19722053

1973-
def plan_files(self) -> Iterable[FileScanTask]:
1974-
"""Plans the relevant files by filtering on the PartitionSpecs.
2054+
def _should_use_rest_planning(self) -> bool:
2055+
"""Check if REST scan planning should be used for this scan."""
2056+
from pyiceberg.catalog.rest import RestCatalog
2057+
2058+
if not isinstance(self.catalog, RestCatalog):
2059+
return False
2060+
return self.catalog.is_rest_scan_planning_enabled()
2061+
2062+
def _plan_files_rest(self) -> Iterable[FileScanTask]:
2063+
"""Plan files using REST server-side scan planning."""
2064+
from pyiceberg.catalog.rest import RestCatalog
2065+
from pyiceberg.catalog.rest.scan_planning import PlanTableScanRequest
2066+
2067+
if not isinstance(self.catalog, RestCatalog):
2068+
raise TypeError("REST scan planning requires a RestCatalog")
2069+
if self.table_identifier is None:
2070+
raise ValueError("REST scan planning requires a table identifier")
2071+
2072+
request = PlanTableScanRequest(
2073+
snapshot_id=self.snapshot_id,
2074+
select=list(self.selected_fields) if self.selected_fields != ("*",) else None,
2075+
filter=self.row_filter if self.row_filter != ALWAYS_TRUE else None,
2076+
case_sensitive=self.case_sensitive,
2077+
)
19752078

1976-
Returns:
1977-
List of FileScanTasks that contain both data and delete files.
1978-
"""
2079+
return self.catalog.plan_scan(self.table_identifier, request)
2080+
2081+
def _plan_files_local(self) -> Iterable[FileScanTask]:
2082+
"""Plan files locally by reading manifests."""
19792083
data_entries: list[ManifestEntry] = []
19802084
positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)
19812085

@@ -2006,6 +2110,20 @@ def plan_files(self) -> Iterable[FileScanTask]:
20062110
for data_entry in data_entries
20072111
]
20082112

2113+
def plan_files(self) -> Iterable[FileScanTask]:
2114+
"""Plans the relevant files by filtering on the PartitionSpecs.
2115+
2116+
If the table comes from a REST catalog with scan planning enabled,
2117+
this will use server-side scan planning. Otherwise, it falls back
2118+
to local planning.
2119+
2120+
Returns:
2121+
List of FileScanTasks that contain both data and delete files.
2122+
"""
2123+
if self._should_use_rest_planning():
2124+
return self._plan_files_rest()
2125+
return self._plan_files_local()
2126+
20092127
def to_arrow(self) -> pa.Table:
20102128
"""Read an Arrow table eagerly from this DataScan.
20112129

0 commit comments

Comments
 (0)