Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update the Pydantic from v1 to v2 #3948

Merged
merged 10 commits into from
Feb 15, 2024
Prev Previous commit
Next Next commit
feat: fix the python lint check errors.
Signed-off-by: Shuchu Han <shuchu.han@gmail.com>
  • Loading branch information
shuchu committed Feb 12, 2024
commit 62a7f1fa5e1e6c4f7b9f46bc5d5b220b199e9d34
2 changes: 1 addition & 1 deletion sdk/python/feast/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
)


def import_class(module_name: str, class_name: str, class_type: Optional[str] = None):
def import_class(module_name: str, class_name: str, class_type: str = ""):
"""
Dynamically loads and returns a class from a module.

Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from types import MethodType
from typing import List, Optional
from typing import List, Optional, no_type_check

import pandas as pd
from pyspark.sql import DataFrame, SparkSession
Expand Down Expand Up @@ -76,6 +76,8 @@ def ingest_stream_feature_view(
online_store_query = self._write_stream_data(transformed_df, to)
return online_store_query

# In the line 64 of __init__(), the "data_source" is assigned a stream_source (and has to be KafkaSource as in line 40).
@no_type_check
def _ingest_stream_data(self) -> StreamTable:
"""Only supports json and avro formats currently."""
if self.format == "json":
Expand Down
6 changes: 5 additions & 1 deletion sdk/python/feast/infra/contrib/stream_processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from abc import ABC
from abc import ABC, abstractmethod
from types import MethodType
from typing import TYPE_CHECKING, Optional

Expand Down Expand Up @@ -50,19 +50,22 @@ def __init__(
self.sfv = sfv
self.data_source = data_source

@abstractmethod
def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None:
"""
Ingests data from the stream source attached to the stream feature view; transforms the data
and then persists it to the online store and/or offline store, depending on the 'to' parameter.
"""
raise NotImplementedError

@abstractmethod
def _ingest_stream_data(self) -> StreamTable:
"""
Ingests data into a StreamTable.
"""
raise NotImplementedError

@abstractmethod
def _construct_transformation_plan(self, table: StreamTable) -> StreamTable:
"""
Applies transformations on top of StreamTable object. Since stream engines use lazy
Expand All @@ -71,6 +74,7 @@ def _construct_transformation_plan(self, table: StreamTable) -> StreamTable:
"""
raise NotImplementedError

@abstractmethod
def _write_stream_data(self, table: StreamTable, to: PushMode) -> None:
"""
Launches a job to persist stream data to the online store and/or offline store, depending
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/feature_servers/base_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ class BaseFeatureServerConfig(FeastConfigBaseModel):
enabled: StrictBool = False
"""Whether the feature server should be launched."""

feature_logging: Optional[FeatureLoggingConfig]
feature_logging: Optional[FeatureLoggingConfig] = None
""" Feature logging configuration """
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,16 @@ def __init__(self, project_name: str, *args, **kwargs):
workgroup=workgroup,
s3_staging_location=f"s3://{bucket_name}/test_dir",
)
self,
sudohainguyen marked this conversation as resolved.
Show resolved Hide resolved

def create_data_source(
self,
df: pd.DataFrame,
destination_name: str,
suffix: Optional[str] = None,
timestamp_field="ts",
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping: Optional[Dict[str, str]] = None,
timestamp_field: Optional[str] = "ts",
) -> DataSource:

table_name = destination_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from feast.infra.provider import RetrievalJob
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import FeastBaseModel, RepoConfig
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.type_map import pa_to_mssql_type
from feast.usage import log_exceptions_and_usage
Expand All @@ -42,7 +42,7 @@
EntitySchema = Dict[str, np.dtype]


class MsSqlServerOfflineStoreConfig(FeastBaseModel):
class MsSqlServerOfflineStoreConfig(FeastConfigBaseModel):
"""Offline store config for SQL Server"""

type: Literal["mssql"] = "mssql"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ def create_data_source(
self,
df: pd.DataFrame,
destination_name: str,
timestamp_field="ts",
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping: Optional[Dict[str, str]] = None,
**kwargs,
timestamp_field: Optional[str] = "ts",
) -> DataSource:
# Make sure the field mapping is correct and convert the datetime datasources.
if timestamp_field in df:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ def create_data_source(
self,
df: pd.DataFrame,
destination_name: str,
suffix: Optional[str] = None,
timestamp_field="ts",
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping: Optional[Dict[str, str]] = None,
timestamp_field: Optional[str] = "ts",
) -> DataSource:
destination_name = self.get_prefixed_table_name(destination_name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ def create_data_source(
self,
df: pd.DataFrame,
destination_name: str,
timestamp_field="ts",
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping: Optional[Dict[str, str]] = None,
**kwargs,
timestamp_field: Optional[str] = "ts",
) -> DataSource:
if timestamp_field in df:
df[timestamp_field] = pd.to_datetime(df[timestamp_field], utc=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
FULL_REPO_CONFIGS = [
IntegrationTestRepoConfig(
provider="local",
offline_store_creator=TrinoSourceCreator,
offline_store_creator=TrinoSourceCreator, # type: ignore
),
]
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ def create_data_source(
self,
df: pd.DataFrame,
destination_name: str,
suffix: Optional[str] = None,
timestamp_field="ts",
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping: Optional[Dict[str, str]] = None,
timestamp_field: Optional[str] = "ts",
) -> DataSource:
destination_name = self.get_prefixed_table_name(destination_name)
self.client.execute_query(
Expand Down Expand Up @@ -128,4 +128,6 @@ def create_offline_store_config(self) -> FeastConfigBaseModel:
catalog="memory",
dataset=self.project_name,
connector={"type": "memory"},
user="test",
auth=None,
)
10 changes: 6 additions & 4 deletions sdk/python/feast/infra/offline_stores/snowflake_source.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import warnings
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, no_type_check

from typeguard import typechecked

Expand Down Expand Up @@ -202,6 +202,7 @@ def get_table_query_string(self) -> str:
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
return type_map.snowflake_type_to_feast_value_type

@no_type_check
def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
Expand Down Expand Up @@ -279,20 +280,21 @@ def get_table_column_names_and_types(
else:
row["snowflake_type"] = "NUMBERwSCALE"

elif row["type_code"] in [5, 9, 12]:
elif row["type_code"] in {5, 9, 12}:
error = snowflake_unsupported_map[row["type_code"]]
raise NotImplementedError(
f"The following Snowflake Data Type is not supported: {error}"
)
elif row["type_code"] in [1, 2, 3, 4, 6, 7, 8, 10, 11, 13]:
elif row["type_code"] in {1, 2, 3, 4, 6, 7, 8, 10, 11, 13}:
shuchu marked this conversation as resolved.
Show resolved Hide resolved
row["snowflake_type"] = snowflake_type_code_map[row["type_code"]]
else:
raise NotImplementedError(
f"The following Snowflake Column is not supported: {row['column_name']} (type_code: {row['type_code']})"
)

return [
(column["column_name"], column["snowflake_type"]) for column in metadata
(str(column["column_name"]), str(column["snowflake_type"]))
sudohainguyen marked this conversation as resolved.
Show resolved Hide resolved
for column in metadata
]


Expand Down
Loading