Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,11 @@ def full_feature_names(self) -> bool:
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
return self._on_demand_feature_views

def _to_df_internal(self) -> pd.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
with self._query_generator() as query:
df = self._execute_query(query).to_dataframe(create_bqstorage_client=True)
df = self._execute_query(query=query, timeout=timeout).to_dataframe(
create_bqstorage_client=True
)
return df

def to_sql(self) -> str:
Expand Down Expand Up @@ -507,15 +509,15 @@ def to_bigquery(
print(f"Done writing to '{dest}'.")
return str(dest)

def _to_arrow_internal(self) -> pyarrow.Table:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
with self._query_generator() as query:
q = self._execute_query(query=query)
q = self._execute_query(query=query, timeout=timeout)
assert q
return q.to_arrow()

@log_exceptions_and_usage
def _execute_query(
self, query, job_config=None, timeout: int = 1800
self, query, job_config=None, timeout: Optional[int] = None
) -> Optional[bigquery.job.query.QueryJob]:
bq_job = self.client.query(query, job_config=job_config)

Expand All @@ -525,7 +527,7 @@ def _execute_query(
)
return None

block_until_done(client=self.client, bq_job=bq_job, timeout=timeout)
block_until_done(client=self.client, bq_job=bq_job, timeout=timeout or 1800)
return bq_job

def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def get_temp_table_dml_header(
return temp_table_dml_header

@log_exceptions_and_usage
def _to_df_internal(self) -> pd.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
with self._query_generator() as query:
temp_table_name = "_" + str(uuid.uuid4()).replace("-", "")
temp_external_location = self.get_temp_s3_path()
Expand All @@ -392,7 +392,7 @@ def _to_df_internal(self) -> pd.DataFrame:
)

@log_exceptions_and_usage
def _to_arrow_internal(self) -> pa.Table:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table:
with self._query_generator() as query:
temp_table_name = "_" + str(uuid.uuid4()).replace("-", "")
temp_external_location = self.get_temp_s3_path()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def __init__(
engine: Engine,
config: MsSqlServerOfflineStoreConfig,
full_feature_names: bool,
on_demand_feature_views: Optional[List[OnDemandFeatureView]],
on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,
metadata: Optional[RetrievalMetadata] = None,
drop_columns: Optional[List[str]] = None,
):
Expand All @@ -347,10 +347,10 @@ def full_feature_names(self) -> bool:
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
return self._on_demand_feature_views

def _to_df_internal(self) -> pandas.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pandas.DataFrame:
return pandas.read_sql(self.query, con=self.engine).fillna(value=np.nan)

def _to_arrow_internal(self) -> pyarrow.Table:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
result = pandas.read_sql(self.query, con=self.engine).fillna(value=np.nan)
return pyarrow.Table.from_pandas(result)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def __init__(
query: Union[str, Callable[[], ContextManager[str]]],
config: RepoConfig,
full_feature_names: bool,
on_demand_feature_views: Optional[List[OnDemandFeatureView]],
on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,
metadata: Optional[RetrievalMetadata] = None,
):
if not isinstance(query, str):
Expand All @@ -267,15 +267,15 @@ def full_feature_names(self) -> bool:
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
return self._on_demand_feature_views

def _to_df_internal(self) -> pd.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
# We use arrow format because it gives better control of the table schema
return self._to_arrow_internal().to_pandas()

def to_sql(self) -> str:
with self._query_generator() as query:
return query

def _to_arrow_internal(self) -> pa.Table:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table:
with self._query_generator() as query:
with _get_conn(self.config.offline_store) as conn, conn.cursor() as cur:
conn.set_session(readonly=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,13 @@ def to_spark_df(self) -> pyspark.sql.DataFrame:
*_, last = map(self.spark_session.sql, statements)
return last

def _to_df_internal(self) -> pd.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
"""Return dataset as Pandas DataFrame synchronously"""
return self.to_spark_df().toPandas()

def _to_arrow_internal(self) -> pyarrow.Table:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
"""Return dataset as pyarrow Table synchronously"""
return pyarrow.Table.from_pandas(self._to_df_internal())
return pyarrow.Table.from_pandas(self._to_df_internal(timeout=timeout))

def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ def full_feature_names(self) -> bool:
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
return self._on_demand_feature_views

def _to_df_internal(self) -> pd.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
"""Return dataset as Pandas DataFrame synchronously including on demand transforms"""
results = self._client.execute_query(query_text=self._query)
self.pyarrow_schema = results.pyarrow_schema
return results.to_dataframe()

def _to_arrow_internal(self) -> pyarrow.Table:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
"""Return payrrow dataset as synchronously including on demand transforms"""
return pyarrow.Table.from_pandas(
self._to_df_internal(), schema=self.pyarrow_schema
self._to_df_internal(timeout=timeout), schema=self.pyarrow_schema
)

def to_sql(self) -> str:
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
return self._on_demand_feature_views

@log_exceptions_and_usage
def _to_df_internal(self) -> pd.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function().compute()
df = df.reset_index(drop=True)
return df

@log_exceptions_and_usage
def _to_arrow_internal(self):
def _to_arrow_internal(self, timeout: Optional[int] = None):
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function().compute()
return pyarrow.Table.from_pandas(df)
Expand Down
24 changes: 17 additions & 7 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ class RetrievalJob(ABC):
"""A RetrievalJob manages the execution of a query to retrieve data from the offline store."""

def to_df(
self, validation_reference: Optional["ValidationReference"] = None
self,
validation_reference: Optional["ValidationReference"] = None,
timeout: Optional[int] = None,
) -> pd.DataFrame:
"""
Synchronously executes the underlying query and returns the result as a pandas dataframe.
Expand All @@ -72,8 +74,9 @@ def to_df(

Args:
validation_reference (optional): The validation to apply against the retrieved dataframe.
timeout (optional): The query timeout if applicable.
"""
features_df = self._to_df_internal()
features_df = self._to_df_internal(timeout=timeout)

if self.on_demand_feature_views:
# TODO(adchia): Fix requirement to specify dependent feature views in feature_refs
Expand Down Expand Up @@ -101,7 +104,9 @@ def to_df(
return features_df

def to_arrow(
self, validation_reference: Optional["ValidationReference"] = None
self,
validation_reference: Optional["ValidationReference"] = None,
timeout: Optional[int] = None,
) -> pyarrow.Table:
"""
Synchronously executes the underlying query and returns the result as an arrow table.
Expand All @@ -111,11 +116,12 @@ def to_arrow(

Args:
validation_reference (optional): The validation to apply against the retrieved dataframe.
timeout (optional): The query timeout if applicable.
"""
if not self.on_demand_feature_views and not validation_reference:
return self._to_arrow_internal()
return self._to_arrow_internal(timeout=timeout)

features_df = self._to_df_internal()
features_df = self._to_df_internal(timeout=timeout)
if self.on_demand_feature_views:
for odfv in self.on_demand_feature_views:
features_df = features_df.join(
Expand Down Expand Up @@ -147,20 +153,24 @@ def to_sql(self) -> str:
pass

@abstractmethod
def _to_df_internal(self) -> pd.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
"""
Synchronously executes the underlying query and returns the result as a pandas dataframe.

timeout: RetreivalJob implementations may implement a timeout.

Does not handle on demand transformations or dataset validation. For either of those,
`to_df` should be used.
"""
pass

@abstractmethod
def _to_arrow_internal(self) -> pyarrow.Table:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
"""
Synchronously executes the underlying query and returns the result as an arrow table.

timeout: RetreivalJob implementations may implement a timeout.

Does not handle on demand transformations or dataset validation. For either of those,
`to_arrow` should be used.
"""
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
return self._on_demand_feature_views

@log_exceptions_and_usage
def _to_df_internal(self) -> pd.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
with self._query_generator() as query:
return aws_utils.unload_redshift_query_to_df(
self._redshift_client,
Expand All @@ -414,7 +414,7 @@ def _to_df_internal(self) -> pd.DataFrame:
)

@log_exceptions_and_usage
def _to_arrow_internal(self) -> pa.Table:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table:
with self._query_generator() as query:
return aws_utils.unload_redshift_query_to_pa(
self._redshift_client,
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ def full_feature_names(self) -> bool:
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
return self._on_demand_feature_views

def _to_df_internal(self) -> pd.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
with self._query_generator() as query:

df = execute_snowflake_statement(
Expand All @@ -419,7 +419,7 @@ def _to_df_internal(self) -> pd.DataFrame:

return df

def _to_arrow_internal(self) -> pyarrow.Table:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
with self._query_generator() as query:

pa_table = execute_snowflake_statement(
Expand Down
Loading