Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 42 additions & 12 deletions dlt/destinations/impl/snowflake/configuration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import dataclasses
import os
from pathlib import Path
from typing import Final, Optional, Any, Dict, ClassVar, List

Expand Down Expand Up @@ -48,6 +49,23 @@ def parse_native_representation(self, native_value: Any) -> None:
setattr(self, param, self.query.get(param))

def on_resolved(self) -> None:
# Auto-detect Snowpark Container Services token
token_path = "/snowflake/session/token"
if (
not self.token
and os.path.exists(token_path)
):
try:
with open(token_path, "r") as f:
self.token = f.read().strip()
# Use env vars if host not set
self.host = self.host or os.getenv("SNOWFLAKE_HOST") or os.getenv("SNOWFLAKE_ACCOUNT")
self.authenticator = self.authenticator or "oauth"
except Exception as ex:
raise ConfigurationValueError(
f"Failed to read Snowpark Container Services token: {ex}"
)

if self.private_key_path:
try:
self.private_key = Path(self.private_key_path).read_text("ascii")
Expand All @@ -56,10 +74,12 @@ def on_resolved(self) -> None:
"Make sure that `private_key` in dlt recognized format is at"
f" `{self.private_key_path}`. Note that binary formats are not supported"
)
if not self.password and not self.private_key and not self.authenticator:

# Update validation to include token authentication
if not (self.password or self.private_key or self.authenticator or self.token):
raise ConfigurationValueError(
"`SnowflakeCredentials` requires one of the following to be specified: `password`,"
" `private_key`, `authenticator` (OAuth2)."
" `private_key`, `authenticator` (OAuth2), or a Snowpark Container Services token."
)

def get_query(self) -> Dict[str, Any]:
Expand All @@ -70,21 +90,31 @@ def get_query(self) -> Dict[str, Any]:
return query

def to_connector_params(self) -> Dict[str, Any]:
# gather all params in query
query = self.get_query()
if self.private_key:
query["private_key"] = decode_private_key(self.private_key, self.private_key_passphrase)

# we do not want passphrase to be passed
query.pop("private_key_passphrase", None)

conn_params: Dict[str, Any] = dict(
query,
user=self.username,
password=self.password,
account=self.host,
database=self.database,
)
# Support token-based authentication
if self.token:
conn_params = dict(
query,
token=self.token,
account=self.host,
authenticator=self.authenticator or "oauth",
database=self.database,
)
# Remove user/password when using token auth
conn_params.pop("user", None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conn_params.pop("password", None)
else:
conn_params = dict(
query,
user=self.username,
password=self.password,
account=self.host,
database=self.database,
)

if self.application != "" and "application" not in conn_params:
conn_params["application"] = self.application
Expand Down
63 changes: 63 additions & 0 deletions docs/website/docs/destinations/snowflake_config.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Snowflake Configuration

## Authentication Methods

Snowflake destination supports several authentication methods:

### Username and Password
```python
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination="snowflake",
credentials={
"username": "my_user",
"password": "my_password",
"account": "my_account"
}
)
```

### Private Key
```python
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination="snowflake",
credentials={
"username": "my_user",
"private_key_path": "path/to/rsa_key.p8",
"account": "my_account"
}
)
```

### Snowpark Container Services

When running inside Snowpark Container Services, dlt will automatically detect and use the mounted OAuth token and environment variables:

```python
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination="snowflake",
dataset_name="my_dataset"
)

# No credentials needed - dlt will automatically use:
# - Token from /snowflake/session/token
# - Account from SNOWFLAKE_HOST or SNOWFLAKE_ACCOUNT env vars
```

You can still provide explicit credentials to override the auto-detection:

```python
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination="snowflake",
credentials={
"token": "my_token", # Override container token
"account": "my_account" # Override SNOWFLAKE_HOST
}
)
```

## Additional Configuration
# ...existing documentation...
60 changes: 60 additions & 0 deletions tests/destinations/test_snowflake_snowpark_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import os
import tempfile
from pathlib import Path
from dlt.destinations.impl.snowflake.configuration import SnowflakeCredentials

def test_snowpark_token(monkeypatch):
"""Test Snowpark Container Services token authentication"""
with tempfile.NamedTemporaryFile("w+", delete=False) as tf:
tf.write("test-token")
tf.flush()
token_path = tf.name

# Set up test environment
monkeypatch.setenv("SNOWFLAKE_HOST", "test-account")
monkeypatch.setenv("SNOWFLAKE_ACCOUNT", "test-account")

# Mock token path existence and file reading
monkeypatch.setattr("os.path.exists", lambda p: p == token_path)
monkeypatch.setattr("builtins.open", lambda p, mode="r": open(token_path, mode))

# Test credentials resolution
creds = SnowflakeCredentials()
creds.on_resolved.__globals__["token_path"] = token_path
creds.on_resolved()

assert creds.token == "test-token"
assert creds.host == "test-account"
assert creds.authenticator == "oauth"

# Test connector parameters
params = creds.to_connector_params()
assert params["token"] == "test-token"
assert params["account"] == "test-account"
assert params["authenticator"] == "oauth"
assert "user" not in params
assert "password" not in params

# Cleanup
Path(token_path).unlink()

def test_explicit_credentials_preferred(monkeypatch):
"""Test that explicit credentials are preferred over Snowpark token"""
with tempfile.NamedTemporaryFile("w+", delete=False) as tf:
tf.write("test-token")
tf.flush()
token_path = tf.name

# Test with explicit credentials
creds = SnowflakeCredentials(
token="explicit-token",
host="explicit-account"
)
creds.on_resolved.__globals__["token_path"] = token_path
creds.on_resolved()

assert creds.token == "explicit-token"
assert creds.host == "explicit-account"

# Cleanup
Path(token_path).unlink()
Loading