Skip to content

Commit

Permalink
fix: Fix timestamp consistency in push api (#3614)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiwon Park <bakjeeone@hotmail.com>
  • Loading branch information
jparkzz authored May 16, 2023
1 parent bfb26c3 commit 9b227d7
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 6 deletions.
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ def offline_write_batch(
assert isinstance(feature_view.batch_source, BigQuerySource)

pa_schema, column_names = offline_utils.get_pyarrow_schema_from_batch_source(
config, feature_view.batch_source
config, feature_view.batch_source, timestamp_unit="ns"
)
if column_names != table.column_names:
raise ValueError(
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def get_offline_store_from_config(offline_store_config: Any) -> OfflineStore:


def get_pyarrow_schema_from_batch_source(
config: RepoConfig, batch_source: DataSource
config: RepoConfig, batch_source: DataSource, timestamp_unit: str = "us"
) -> Tuple[pa.Schema, List[str]]:
"""Returns the pyarrow schema and column names for the given batch source."""
column_names_and_types = batch_source.get_table_column_names_and_types(config)
Expand All @@ -244,7 +244,8 @@ def get_pyarrow_schema_from_batch_source(
(
column_name,
feast_value_type_to_pa(
batch_source.source_datatype_to_feast_value_type()(column_type)
batch_source.source_datatype_to_feast_value_type()(column_type),
timestamp_unit=timestamp_unit,
),
)
)
Expand Down
8 changes: 5 additions & 3 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,9 @@ def pg_type_to_feast_value_type(type_str: str) -> ValueType:
return value


def feast_value_type_to_pa(feast_type: ValueType) -> "pyarrow.DataType":
def feast_value_type_to_pa(
feast_type: ValueType, timestamp_unit: str = "us"
) -> "pyarrow.DataType":
import pyarrow

type_map = {
Expand All @@ -855,15 +857,15 @@ def feast_value_type_to_pa(feast_type: ValueType) -> "pyarrow.DataType":
ValueType.STRING: pyarrow.string(),
ValueType.BYTES: pyarrow.binary(),
ValueType.BOOL: pyarrow.bool_(),
ValueType.UNIX_TIMESTAMP: pyarrow.timestamp("us"),
ValueType.UNIX_TIMESTAMP: pyarrow.timestamp(timestamp_unit),
ValueType.INT32_LIST: pyarrow.list_(pyarrow.int32()),
ValueType.INT64_LIST: pyarrow.list_(pyarrow.int64()),
ValueType.DOUBLE_LIST: pyarrow.list_(pyarrow.float64()),
ValueType.FLOAT_LIST: pyarrow.list_(pyarrow.float32()),
ValueType.STRING_LIST: pyarrow.list_(pyarrow.string()),
ValueType.BYTES_LIST: pyarrow.list_(pyarrow.binary()),
ValueType.BOOL_LIST: pyarrow.list_(pyarrow.bool_()),
ValueType.UNIX_TIMESTAMP_LIST: pyarrow.list_(pyarrow.timestamp("us")),
ValueType.UNIX_TIMESTAMP_LIST: pyarrow.list_(pyarrow.timestamp(timestamp_unit)),
ValueType.NULL: pyarrow.null(),
}
return type_map[feast_type]
Expand Down

0 comments on commit 9b227d7

Please sign in to comment.