Skip to content

Add SnowflakeFile mocked stage and snowurl support #3479

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 54 commits into from
Jul 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
e167c98
implement snowflakeFile methods for local testing with a local path
sfc-gh-lju Jun 6, 2025
6968715
update changelog
sfc-gh-lju Jun 11, 2025
d3883fd
move udf tests to test_udf_files
sfc-gh-lju Jun 12, 2025
4187b96
Merge branch 'main' into lju-SNOW-2110572-SnowflakeFile-local-path
sfc-gh-lju Jun 12, 2025
21ea2a3
use helper functions
sfc-gh-lju Jun 14, 2025
95d8843
update error message
sfc-gh-lju Jun 16, 2025
ce62272
fix gw error by removing generate_random_alphanumeric from parameter
sfc-gh-lju Jun 16, 2025
09746fb
use Union typing for python 3.9 compatability
sfc-gh-lju Jun 16, 2025
e73c5b8
use Union typing for python 3.9 compatability
sfc-gh-lju Jun 16, 2025
eadb339
use tmp_path instead of our defined fixture
sfc-gh-lju Jun 16, 2025
6f2ff13
fix windows error with relative path
sfc-gh-lju Jun 16, 2025
10be164
use self._pos instead of tell()
sfc-gh-lju Jun 16, 2025
5faf383
use size.nbytes for self._pos
sfc-gh-lju Jun 16, 2025
7199de6
return self._pos
sfc-gh-lju Jun 17, 2025
0b80677
update self._pos in readinto
sfc-gh-lju Jun 17, 2025
7b9cd25
implement mocked stage support
sfc-gh-lju Jun 18, 2025
c9ba88f
Merge branch 'main' into lju-SNOW-2110578-SnowflakeFile-mocked-stage-…
sfc-gh-lju Jun 23, 2025
00747b0
Make docstring explaining the functionality of the SFFile Class
sfc-gh-lju Jun 20, 2025
e9de88e
add link to github for bug reports
sfc-gh-lju Jun 20, 2025
63cc4f6
use issues tab link
sfc-gh-lju Jun 20, 2025
1cc1ef8
fix rebase errors
sfc-gh-lju Jun 23, 2025
660946e
add changelog
sfc-gh-lju Jun 23, 2025
2ba0ac1
add testing for no session and multiple sessions
sfc-gh-lju Jun 23, 2025
4f977c5
fix session close
sfc-gh-lju Jun 23, 2025
555a6c7
remove session.close, as it's causing bugs with other tests
sfc-gh-lju Jun 23, 2025
b459c82
fix bug where no default session is found
sfc-gh-lju Jun 23, 2025
d3a0fb4
use session per function
sfc-gh-lju Jun 23, 2025
7f5b478
fix issue of same file being read/written to in different tests
sfc-gh-lju Jun 23, 2025
008f51b
remove random file location
sfc-gh-lju Jun 23, 2025
bff2b85
fix return types
sfc-gh-lju Jun 23, 2025
42fd7e6
Merge branch 'main' into lju-SNOW-2110578-SnowflakeFile-mocked-stage-…
sfc-gh-lju Jul 1, 2025
644dc6a
re-add write mode comments for seek and tell
sfc-gh-lju Jul 1, 2025
c36af16
remove excessive tests
sfc-gh-lju Jul 1, 2025
9e5d08a
make changelog changes
sfc-gh-lju Jul 1, 2025
d8099eb
fix bug with previous stage writes affecting current test
sfc-gh-lju Jul 1, 2025
522977c
enable windows support for tests with escape characters
sfc-gh-lju Jul 1, 2025
520f6fd
fix windows bug with readinto escape chars
sfc-gh-lju Jul 1, 2025
ccc5560
append binary string for windows test failures and fix coverage issue
sfc-gh-lju Jul 2, 2025
6fce59b
fix windows failures
sfc-gh-lju Jul 2, 2025
73f8a74
Delete tests/integ/test_udf_files.py
sfc-gh-lju Jul 2, 2025
9a484e1
escape \r in windows testing
sfc-gh-lju Jul 2, 2025
12e5167
fix windows failures for mock tests
sfc-gh-lju Jul 2, 2025
5e10655
fix windows please
sfc-gh-lju Jul 2, 2025
f5c8202
fix windows testS
sfc-gh-lju Jul 2, 2025
c7cbee3
Update tests/mock/test_udf_files.py
sfc-gh-lju Jul 2, 2025
b12f239
add unicode tests and update docs
sfc-gh-lju Jul 2, 2025
46ae930
enable unicode support in tests
sfc-gh-lju Jul 2, 2025
1dae622
fix unicode errors
sfc-gh-lju Jul 2, 2025
303ee0e
revert readlines change
sfc-gh-lju Jul 3, 2025
d956c45
fix utf encoding error for local file stream
sfc-gh-lju Jul 3, 2025
fd0d48d
re-add tests and use snow URL for changelog
sfc-gh-lju Jul 6, 2025
24cd286
refactor tests
sfc-gh-lju Jul 8, 2025
ef6189b
Merge branch 'main' into lju-SNOW-2110578-SnowflakeFile-mocked-stage-…
sfc-gh-lju Jul 8, 2025
7475728
Merge branch 'main' into lju-SNOW-2110578-SnowflakeFile-mocked-stage-…
sfc-gh-lju Jul 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ local ingestion. By default, local ingestion uses multithreading. Multiprocessin

### Snowpark Local Testing Updates

- Added local testing support for reading files with `SnowflakeFile` using local file paths.
- Added local testing support for reading files with `SnowflakeFile` using local file paths, stage paths (@stage/file_path), and the Snow URL semantic (snow://...).

#### Bug Fixes

Expand Down
156 changes: 98 additions & 58 deletions src/snowflake/snowpark/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import array
import sys
import tempfile
import os
from io import (
RawIOBase,
UnsupportedOperation,
Expand All @@ -17,8 +16,12 @@
SEEK_END,
SEEK_CUR,
)
from snowflake.snowpark._internal.utils import RELATIVE_PATH_PREFIX
from snowflake.snowpark._internal.utils import (
RELATIVE_PATH_PREFIX,
SNOWFLAKE_PATH_PREFIXES,
)
from typing import Sequence
from snowflake.snowpark.context import get_active_session
import logging

# Python 3.8 needs to use typing.Iterable because collections.abc.Iterable is not subscriptable
Expand All @@ -29,13 +32,13 @@
else:
from collections.abc import Iterable

_NON_LOCAL_PATH_ERR_MSG = "SnowflakeFile currently supports only relative paths and read apis in local testing mode."

_WRITE_MODE_ERR_MSG = (
"SnowflakeFile currently doesn't support write APIs in local testing mode."
)
_DEFER_IMPLEMENTATION_ERR_MSG = "Not yet supported in UDF and Stored Procedures."
READ_MODES = ["r", "rb"]
WRITE_MODES = ["w", "wb"]
_READ_MODES = ["r", "rb"]
_WRITE_MODES = ["w", "wb"]
_logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -76,8 +79,9 @@ class SnowflakeFile(RawIOBase):

These examples are using the client, but this same pattern can be used inside SQL-defined UDFs.

We provide a local implementation of SnowflakeFile to aid in local testing. This currently only supports using read APIs on relative paths.
Local testing to Snowflake stages is not yet supported.
We provide a local implementation of SnowflakeFile to aid in local testing.
This currently only supports using read APIs on relative paths and mocked stages
(sessions in local testing mode that aren't connected to a real stage).

Note:
1. All of the implementation in this file is for local testing purposes.
Expand Down Expand Up @@ -112,21 +116,50 @@ def __init__(
self.encoding = None
self.errors = None

# Attributes required for local testing functionality
_DEFAULT_READ_BUFFER_SIZE = 32 * 1024
if mode in READ_MODES:
# Buffered Reader used to support BufferedIOBase methods such as read1 and readinto1
self._file_stream = BufferedReader(
open(self._file_location, self._mode), _DEFAULT_READ_BUFFER_SIZE
)
elif mode in WRITE_MODES:
# need to still open a file stream
self._file_stream = open(self._file_location, self._mode)
self._pos = 0
self._is_local_file = (
True
if self._file_location.startswith((RELATIVE_PATH_PREFIX, "C:"))
else False
)
self._is_stage_file = (
True
if self._file_location.startswith(tuple(SNOWFLAKE_PATH_PREFIXES))
else False
)

# Validate the file location
if not self._is_local_file and not self._is_stage_file:
raise ValueError(
f"Invalid file location '{self._file_location}'. "
"File location must be a local file path or a stage file URL."
)

self._file_size = 0
if self._is_local_file and mode in _READ_MODES:
# Buffered Reader used to support BufferedIOBase methods such as read1 and readinto1
encoding = "utf-8" if mode == "r" else None
self._file_stream = BufferedReader(
open(self._file_location, self._mode, encoding=encoding),
_DEFAULT_READ_BUFFER_SIZE,
)

# SEEK_CUR and SEEK_END are only supported in binary mode for Python IO so we must obtain the size of the file
# in a different way. We can use the raw stream to get the size of the file.
temp_file = open(self._file_location, "rb")
self._file_size = temp_file.seek(0, SEEK_END)
temp_file.close()
elif self._is_stage_file and mode in _READ_MODES:
self._file_stream = get_active_session().file.get_stream(
self._file_location
)
self._file_size = self._file_stream.seek(0, SEEK_END)
self._file_stream.seek(0, SEEK_SET)
elif mode in _WRITE_MODES:
# Need to open a file stream for local testing to ensure APIs work in write mode
self._file_stream = open(self._file_location, self._mode)

@classmethod
def open(
Expand Down Expand Up @@ -155,7 +188,7 @@ def open(
is_owner_file: (Deprecated) A boolean value, if True, the API is intended to access owner's files and all URI/URL are allowed. If False, the API is intended to access files passed into the function by the caller and only scoped URL is allowed.
require_scoped_url: A boolean value, if True, file_location must be a scoped URL. A scoped URL ensures that the caller cannot access the UDF owners files that the caller does not have access to.
"""
if mode not in READ_MODES:
if mode not in _READ_MODES:
raise ValueError(
f"Invalid mode '{mode}' for SnowflakeFile.open. Supported modes are 'r' and 'rb'."
)
Expand All @@ -173,7 +206,7 @@ def open_new_result(cls, mode: str = "w") -> SnowflakeFile:
Args:
mode: A string used to mark the type of an IO stream. Supported modes are "w" for text write and "wb" for binary write.
"""
if mode not in WRITE_MODES:
if mode not in _WRITE_MODES:
raise ValueError(
f"Invalid mode '{mode}' for SnowflakeFile.open_new_result. Supported modes are 'w' and 'wb'."
)
Expand All @@ -188,14 +221,14 @@ def _raise_if_not_read(self) -> None:
"""
Internal function to validate read mode of the file object before performing an IO operation.
"""
if self._mode not in READ_MODES:
if self._mode not in _READ_MODES:
raise UnsupportedOperation(f"Not readable mode={self._mode}")

def _raise_if_not_write(self) -> None:
"""
Internal function to validate write mode of the file object before performing a IO operation.
"""
if self._mode not in WRITE_MODES:
if self._mode not in _WRITE_MODES:
raise UnsupportedOperation(f"Not writable mode={self._mode}")

def _raise_if_closed(self) -> None:
Expand Down Expand Up @@ -245,7 +278,7 @@ def fileno(self) -> int:

def flush(self) -> None:
"""
Fail if the stream is closed. Does nothing
Fail if the stream is closed. Does nothing.
"""
self._raise_if_closed()
pass
Expand All @@ -271,9 +304,12 @@ def read(self, size: int = -1) -> Sequence:
self._raise_if_not_read()
if self._is_local_file:
content = self._file_stream.raw.read(size)
self._pos += len(content)
return content
raise NotImplementedError(_NON_LOCAL_PATH_ERR_MSG)
elif self._is_stage_file:
content = self._file_stream.read(size)
if self._mode == "r":
content = content.decode()
self._pos += len(content)
return content

def read1(self, size: int = -1) -> Sequence:
"""
Expand All @@ -288,11 +324,12 @@ def read1(self, size: int = -1) -> Sequence:
if self._is_local_file:
if self._mode == "r":
content = self.read(size).encode()
else:
elif self._mode == "rb":
content = self._file_stream.read1(size)
self._pos += len(content)
return content
raise NotImplementedError(_NON_LOCAL_PATH_ERR_MSG)
elif self._is_stage_file:
content = self._file_stream.read1(size)
self._pos += len(content)
return content

def readable(self) -> bool:
"""
Expand Down Expand Up @@ -326,9 +363,10 @@ def readinto(self, b: bytes | bytearray | array.array) -> int:
if self._mode == "r":
return self._read_into_buffer(b)
size = self._file_stream.raw.readinto(b)
self._pos += size
return size
raise NotImplementedError(_NON_LOCAL_PATH_ERR_MSG)
elif self._is_stage_file:
size = self._file_stream.readinto(b)
self._pos += size
return size

def readinto1(self, b: bytes | bytearray | array.array) -> int:
"""
Expand All @@ -342,9 +380,10 @@ def readinto1(self, b: bytes | bytearray | array.array) -> int:
if self._mode == "r":
return self._read_into_buffer(b)
size = self._file_stream.readinto1(b)
self._pos += size
return size
raise NotImplementedError(_NON_LOCAL_PATH_ERR_MSG)
elif self._is_stage_file:
size = self._file_stream.readinto1(b)
self._pos += size
return size

def readline(self, size: int = -1) -> Sequence:
"""
Expand All @@ -356,9 +395,12 @@ def readline(self, size: int = -1) -> Sequence:
self._raise_if_not_read()
if self._is_local_file:
content = self._file_stream.raw.readline(size)
self._pos += len(content)
return content
raise NotImplementedError(_NON_LOCAL_PATH_ERR_MSG)
elif self._is_stage_file:
content = self._file_stream.readline(size)
if self._mode == "r":
content = content.decode()
self._pos += len(content)
return content

def readlines(self, hint: int = -1) -> list[Sequence]:
"""
Expand All @@ -375,9 +417,12 @@ def readlines(self, hint: int = -1) -> list[Sequence]:
self._raise_if_not_read()
if self._is_local_file:
content = self._file_stream.raw.readlines(hint)
self._pos += sum(len(line) for line in content)
return content
raise NotImplementedError(_NON_LOCAL_PATH_ERR_MSG)
elif self._is_stage_file:
content = self._file_stream.readlines(hint)
if self._mode == "r":
content = [line.decode() for line in content]
self._pos += sum(len(line) for line in content)
return content

def seek(self, offset: int, whence: int = SEEK_SET) -> int:
"""
Expand All @@ -392,19 +437,18 @@ def seek(self, offset: int, whence: int = SEEK_SET) -> int:
"""
self._raise_if_closed()
self._raise_if_not_read()
if self._is_local_file:
if whence == SEEK_SET:
self._pos = offset
elif whence == SEEK_CUR:
self._pos = self._file_stream.tell() + offset
elif whence == SEEK_END:
self._pos = os.path.getsize(self._file_location) + offset
else:
raise NotImplementedError(f"Unsupported whence value {whence}")
if self._pos < 0:
raise ValueError(f"Negative seek position {self._pos}")
return self._file_stream.seek(self._pos, SEEK_SET)
raise NotImplementedError(_NON_LOCAL_PATH_ERR_MSG)
if whence == SEEK_SET:
pos = offset
elif whence == SEEK_CUR:
pos = self._file_stream.tell() + offset
elif whence == SEEK_END:
pos = self._file_size + offset
else:
raise NotImplementedError(f"Unsupported whence value {whence}")
if pos < 0:
raise ValueError(f"Negative seek position {pos}")
self._pos = pos
return self._file_stream.seek(self._pos, SEEK_SET)

def seekable(self) -> bool:
"""
Expand All @@ -413,9 +457,7 @@ def seekable(self) -> bool:
Returns whether or not the stream is seekable.
"""
self._raise_if_closed()
if self._is_local_file:
return self._mode in READ_MODES
raise NotImplementedError(_NON_LOCAL_PATH_ERR_MSG)
return self._mode in _READ_MODES

def tell(self) -> int:
"""
Expand All @@ -425,9 +467,7 @@ def tell(self) -> int:
"""
self._raise_if_closed()
self._raise_if_not_read()
if self._is_local_file:
return self._pos
raise NotImplementedError(_NON_LOCAL_PATH_ERR_MSG)
return self._pos

def truncate(self, size: int | None = None) -> int:
"""
Expand Down
7 changes: 7 additions & 0 deletions tests/mock/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from snowflake.snowpark import Session
from snowflake.snowpark.mock._connection import MockServerConnection
from tests.utils import Utils


@pytest.fixture(scope="function")
Expand All @@ -24,3 +25,9 @@ def session(mock_server_connection):

def pytest_sessionstart(session):
os.environ["SNOWPARK_LOCAL_TESTING_INTERNAL_TELEMETRY"] = "1"


@pytest.fixture(scope="function")
def tmp_stage():
tmp_stage_name = Utils.random_stage_name()
yield tmp_stage_name
Loading