Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add SQLite as an offline store for Feast #4395

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions sdk/python/feast/infra/offline_stores/sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import sqlite3
from contextlib import contextmanager
from typing import Any, Dict, List, Literal, Optional

from pydantic import StrictStr

from feast.infra.offline_stores.offline_store import OfflineStore
from feast.protos.feast.types.EntitySource_pb2 import EntitySource as EntitySourceProto
from feast.repo_config import FeastConfigBaseModel


class SQLiteOfflineStoreConfig(FeastConfigBaseModel):
"""Offline store config for SQLite"""

type: Literal["sqlite"] = "sqlite"
connection_string: StrictStr
database: Optional[StrictStr] = None

class SQLiteOfflineStore(OfflineStore):
"""SQLite implementation of Feast offline store"""

def __init__(self, config: SQLiteOfflineStoreConfig):
self.connection_string = config.connection_string
self.database = config.database

@staticmethod
def from_config(config: SQLiteOfflineStoreConfig) -> "SQLiteOfflineStore":
"""Create SQLiteOfflineStore from config"""
return SQLiteOfflineStore(config)

@contextmanager
def _get_connection(self):
"""Context manager for database connections"""
conn = None
try:
conn = sqlite3.connect(self.connection_string)
yield conn
except sqlite3.Error as e:
raise ValueError(f"Error connecting to SQLite database: {e}")
finally:
if conn:
conn.close()

def _get_sqlite_type(self, value: Any) -> str:
"""Map Python types to SQLite types"""
if isinstance(value, (int, float)):
return "REAL"
elif isinstance(value, bool):
return "INTEGER"
else:
return "TEXT"

def write(self, table_name: str, data: List[Dict[str, Any]]):
"""Write data to SQLite table"""
if not data:
raise ValueError("No data provided for writing")

with self._get_connection() as conn:
cursor = conn.cursor()
try:
# Create table if not exists with appropriate types
columns = ", ".join([f"{key} {self._get_sqlite_type(value)}" for key, value in data[0].items()])
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({columns})")

# Insert data
placeholders = ", ".join(["?" for _ in data[0]])
insert_query = f"INSERT INTO {table_name} VALUES ({placeholders})"
cursor.executemany(insert_query, [tuple(row.values()) for row in data])
conn.commit()
except sqlite3.Error as e:
conn.rollback()
raise ValueError(f"Error writing to SQLite database: {e}")

def read(self, table_name: str, columns: List[str], filters: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
"""Read data from SQLite table"""
with self._get_connection() as conn:
cursor = conn.cursor()
try:
query = f"SELECT {', '.join(columns)} FROM {table_name}"
params = []
if filters:
where_clause = " AND ".join([f"{key} = ?" for key in filters.keys()])
query += f" WHERE {where_clause}"
params = list(filters.values())

cursor.execute(query, params)
results = cursor.fetchall()
return [dict(zip(columns, row)) for row in results]
except sqlite3.Error as e:
raise ValueError(f"Error reading from SQLite database: {e}")

def delete(self, table_name: str, filters: Optional[Dict[str, Any]] = None):
"""Delete data from SQLite table"""
with self._get_connection() as conn:
cursor = conn.cursor()
try:
query = f"DELETE FROM {table_name}"
params = []
if filters:
where_clause = " AND ".join([f"{key} = ?" for key in filters.keys()])
query += f" WHERE {where_clause}"
params = list(filters.values())

cursor.execute(query, params)
conn.commit()
except sqlite3.Error as e:
conn.rollback()
raise ValueError(f"Error deleting from SQLite database: {e}")

def get_table(self, entity_source: EntitySourceProto) -> str:
"""Get table name from EntitySourceProto"""
if entity_source.HasField("table_ref"):
return entity_source.table_ref
elif entity_source.HasField("file_source"):
return entity_source.file_source.file_name
else:
raise ValueError("Unsupported entity source type")
Original file line number Diff line number Diff line change
Expand Up @@ -377,3 +377,51 @@ def get_fixtures(request, environment):
)

return config, data_source, fv

@pytest.mark.integration
@pytest.mark.universal_offline_stores(only=["sqlite"])
def test_get_historical_features_sqlite(offline_types_test_fixtures, environment):
config, data_source, fv = offline_types_test_fixtures
fs = environment.feature_store
entity = driver()
fv = driver_feature_view(
data_source=data_source,
name="get_historical_features_sqlite",
dtype=_get_feast_type(config.feature_dtype, config.feature_is_list),
)
fs.apply([fv, entity])

entity_df = pd.DataFrame()
entity_df["driver_id"] = [1, 3]
ts = pd.Timestamp(_utc_now()).round("ms")
entity_df["ts"] = [
ts - timedelta(hours=4),
ts - timedelta(hours=2),
]
features = [f"{fv.name}:value"]

historical_features = fs.get_historical_features(
entity_df=entity_df,
features=features,
)
historical_features_df = historical_features.to_df()

assert not historical_features_df.empty, "Historical features DataFrame should not be empty"
assert "value" in historical_features_df.columns, "Value column should be present in the result"

if config.feature_is_list:
assert_feature_list_types(
environment.provider,
config.feature_dtype,
historical_features_df,
)
else:
assert_expected_historical_feature_types(
config.feature_dtype, historical_features_df
)
assert_expected_arrow_types(
environment.provider,
config.feature_dtype,
config.feature_is_list,
historical_features,
)
Loading