Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update the Pydantic from v1 to v2 #3948

Merged
merged 10 commits into from
Feb 15, 2024
3 changes: 1 addition & 2 deletions sdk/python/feast/importer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import importlib
from typing import Optional

from feast.errors import (
FeastClassImportError,
Expand All @@ -8,7 +7,7 @@
)


def import_class(module_name: str, class_name: str, class_type: Optional[str] = None):
def import_class(module_name: str, class_name: str, class_type: str = ""):
"""
Dynamically loads and returns a class from a module.

Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from types import MethodType
from typing import List, Optional
from typing import List, Optional, no_type_check

import pandas as pd
from pyspark.sql import DataFrame, SparkSession
Expand Down Expand Up @@ -76,6 +76,8 @@ def ingest_stream_feature_view(
online_store_query = self._write_stream_data(transformed_df, to)
return online_store_query

# In the line 64 of __init__(), the "data_source" is assigned a stream_source (and has to be KafkaSource as in line 40).
@no_type_check
def _ingest_stream_data(self) -> StreamTable:
"""Only supports json and avro formats currently."""
if self.format == "json":
Expand Down
6 changes: 5 additions & 1 deletion sdk/python/feast/infra/contrib/stream_processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from abc import ABC
from abc import ABC, abstractmethod
from types import MethodType
from typing import TYPE_CHECKING, Optional

Expand Down Expand Up @@ -50,19 +50,22 @@ def __init__(
self.sfv = sfv
self.data_source = data_source

@abstractmethod
def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None:
"""
Ingests data from the stream source attached to the stream feature view; transforms the data
and then persists it to the online store and/or offline store, depending on the 'to' parameter.
"""
raise NotImplementedError

@abstractmethod
def _ingest_stream_data(self) -> StreamTable:
"""
Ingests data into a StreamTable.
"""
raise NotImplementedError

@abstractmethod
def _construct_transformation_plan(self, table: StreamTable) -> StreamTable:
"""
Applies transformations on top of StreamTable object. Since stream engines use lazy
Expand All @@ -71,6 +74,7 @@ def _construct_transformation_plan(self, table: StreamTable) -> StreamTable:
"""
raise NotImplementedError

@abstractmethod
def _write_stream_data(self, table: StreamTable, to: PushMode) -> None:
"""
Launches a job to persist stream data to the online store and/or offline store, depending
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Literal

from pydantic import StrictBool, StrictStr
from pydantic.typing import Literal

from feast.infra.feature_servers.base_config import BaseFeatureServerConfig

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/feature_servers/base_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ class BaseFeatureServerConfig(FeastConfigBaseModel):
enabled: StrictBool = False
"""Whether the feature server should be launched."""

feature_logging: Optional[FeatureLoggingConfig]
feature_logging: Optional[FeatureLoggingConfig] = None
""" Feature logging configuration """
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Literal

from pydantic import StrictBool
from pydantic.typing import Literal

from feast.infra.feature_servers.base_config import BaseFeatureServerConfig

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from pydantic.typing import Literal
from typing import Literal

from feast.infra.feature_servers.base_config import BaseFeatureServerConfig

Expand Down
6 changes: 2 additions & 4 deletions sdk/python/feast/infra/materialization/snowflake_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import click
import pandas as pd
from colorama import Fore, Style
from pydantic import Field, StrictStr
from pydantic import ConfigDict, Field, StrictStr
from pytz import utc
from tqdm import tqdm

Expand Down Expand Up @@ -72,9 +72,7 @@ class SnowflakeMaterializationEngineConfig(FeastConfigBaseModel):

schema_: Optional[str] = Field("PUBLIC", alias="schema")
""" Snowflake schema name """

class Config:
allow_population_by_field_name = True
model_config = ConfigDict(populate_by_name=True)


@dataclass
Expand Down
22 changes: 10 additions & 12 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Dict,
Iterator,
List,
Literal,
Optional,
Tuple,
Union,
Expand All @@ -19,8 +20,7 @@
import pandas as pd
import pyarrow
import pyarrow.parquet
from pydantic import ConstrainedStr, StrictStr, validator
from pydantic.typing import Literal
from pydantic import StrictStr, field_validator
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed

from feast import flags_helper
Expand Down Expand Up @@ -72,13 +72,6 @@ def get_http_client_info():
return http_client_info.ClientInfo(user_agent=get_user_agent())


class BigQueryTableCreateDisposition(ConstrainedStr):
"""Custom constraint for table_create_disposition. To understand more, see:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.create_disposition"""

values = {"CREATE_NEVER", "CREATE_IF_NEEDED"}


class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
"""Offline store config for GCP BigQuery"""

Expand All @@ -102,10 +95,15 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
gcs_staging_location: Optional[str] = None
""" (optional) GCS location used for offloading BigQuery results as parquet files."""

table_create_disposition: Optional[BigQueryTableCreateDisposition] = None
""" (optional) Specifies whether the job is allowed to create new tables. The default value is CREATE_IF_NEEDED."""
table_create_disposition: Literal[
"CREATE_NEVER", "CREATE_IF_NEEDED"
] = "CREATE_IF_NEEDED"
""" (optional) Specifies whether the job is allowed to create new tables. The default value is CREATE_IF_NEEDED.
Custom constraint for table_create_disposition. To understand more, see:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.create_disposition
"""

@validator("billing_project_id")
@field_validator("billing_project_id")
def project_id_exists(cls, v, values, **kwargs):
if v and not values["project_id"]:
raise ValueError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
Dict,
Iterator,
List,
Literal,
Optional,
Tuple,
Union,
Expand All @@ -18,7 +19,6 @@
import pyarrow
import pyarrow as pa
from pydantic import StrictStr
from pydantic.typing import Literal
from pytz import utc

from feast import OnDemandFeatureView
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ def create_data_source(
self,
df: pd.DataFrame,
destination_name: str,
suffix: Optional[str] = None,
timestamp_field="ts",
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping: Optional[Dict[str, str]] = None,
timestamp_field: Optional[str] = "ts",
) -> DataSource:

table_name = destination_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
import warnings
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union

import numpy as np
import pandas
import pyarrow
import pyarrow as pa
import sqlalchemy
from pydantic.types import StrictStr
from pydantic.typing import Literal
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker
Expand All @@ -32,7 +31,7 @@
from feast.infra.provider import RetrievalJob
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import FeastBaseModel, RepoConfig
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.type_map import pa_to_mssql_type
from feast.usage import log_exceptions_and_usage
Expand All @@ -43,7 +42,7 @@
EntitySchema = Dict[str, np.dtype]


class MsSqlServerOfflineStoreConfig(FeastBaseModel):
class MsSqlServerOfflineStoreConfig(FeastConfigBaseModel):
"""Offline store config for SQL Server"""

type: Literal["mssql"] = "mssql"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ def create_data_source(
self,
df: pd.DataFrame,
destination_name: str,
timestamp_field="ts",
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping: Optional[Dict[str, str]] = None,
**kwargs,
timestamp_field: Optional[str] = "ts",
) -> DataSource:
# Make sure the field mapping is correct and convert the datetime datasources.
if timestamp_field in df:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
Iterator,
KeysView,
List,
Literal,
Optional,
Tuple,
Union,
Expand All @@ -19,7 +20,6 @@
import pyarrow as pa
from jinja2 import BaseLoader, Environment
from psycopg2 import sql
from pydantic.typing import Literal
from pytz import utc

from feast.data_source import DataSource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ def create_data_source(
self,
df: pd.DataFrame,
destination_name: str,
suffix: Optional[str] = None,
timestamp_field="ts",
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping: Optional[Dict[str, str]] = None,
timestamp_field: Optional[str] = "ts",
) -> DataSource:
destination_name = self.get_prefixed_table_name(destination_name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pyspark.sql import SparkSession

from feast.data_source import DataSource
from feast.feature_logging import LoggingDestination
from feast.infra.offline_stores.contrib.spark_offline_store.spark import (
SparkOfflineStoreConfig,
)
Expand Down Expand Up @@ -68,10 +69,10 @@ def create_data_source(
self,
df: pd.DataFrame,
destination_name: str,
timestamp_field="ts",
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping: Optional[Dict[str, str]] = None,
**kwargs,
timestamp_field: Optional[str] = "ts",
) -> DataSource:
if timestamp_field in df:
df[timestamp_field] = pd.to_datetime(df[timestamp_field], utc=True)
Expand Down Expand Up @@ -119,3 +120,7 @@ def create_saved_dataset_destination(self) -> SavedDatasetSparkStorage:

def get_prefixed_table_name(self, suffix: str) -> str:
return f"{self.project_name}_{suffix}"

def create_logged_features_destination(self) -> LoggingDestination:
# No implementation of LoggingDestination for Spark offline store.
return None # type: ignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
FULL_REPO_CONFIGS = [
IntegrationTestRepoConfig(
provider="local",
offline_store_creator=TrinoSourceCreator,
offline_store_creator=TrinoSourceCreator, # type: ignore
),
]
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ def create_data_source(
self,
df: pd.DataFrame,
destination_name: str,
suffix: Optional[str] = None,
timestamp_field="ts",
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping: Optional[Dict[str, str]] = None,
timestamp_field: Optional[str] = "ts",
) -> DataSource:
destination_name = self.get_prefixed_table_name(destination_name)
self.client.execute_query(
Expand Down Expand Up @@ -128,4 +128,6 @@ def create_offline_store_config(self) -> FeastConfigBaseModel:
catalog="memory",
dataset=self.project_name,
connector={"type": "memory"},
user="test",
auth=None,
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import numpy as np
import pandas as pd
import pyarrow
from pydantic import Field, FilePath, SecretStr, StrictBool, StrictStr, root_validator
from pydantic import Field, FilePath, SecretStr, StrictBool, StrictStr, model_validator
from trino.auth import (
BasicAuthentication,
CertificateAuthentication,
Expand Down Expand Up @@ -98,14 +98,14 @@ class AuthConfig(FeastConfigBaseModel):
type: Literal["kerberos", "basic", "jwt", "oauth2", "certificate"]
config: Optional[Dict[StrictStr, Any]]

@root_validator
def config_only_nullable_for_oauth2(cls, values):
auth_type = values["type"]
auth_config = values["config"]
@model_validator(mode="after")
def config_only_nullable_for_oauth2(self):
auth_type = self.type
auth_config = self.config
if auth_type != "oauth2" and auth_config is None:
raise ValueError(f"config cannot be null for auth type '{auth_type}'")

return values
return self

def to_trino_auth(self):
auth_type = self.type
Expand Down
3 changes: 1 addition & 2 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, List, Optional, Tuple, Union
from typing import Any, Callable, List, Literal, Optional, Tuple, Union

import dask.dataframe as dd
import pandas as pd
import pyarrow
import pyarrow.dataset
import pyarrow.parquet
import pytz
from pydantic.typing import Literal

from feast.data_source import DataSource
from feast.errors import (
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ def to_arrow(

return pyarrow.Table.from_pandas(features_df)

def to_sql(self) -> str:
def to_sql(self) -> str: # type: ignore
shuchu marked this conversation as resolved.
Show resolved Hide resolved
"""
Return RetrievalJob generated SQL statement if applicable.
"""
raise NotImplementedError
pass

def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
"""
Expand Down
Loading
Loading