From 380cc62c0bbfbb8d86b6c37ff59494c5556f4c47 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 19 Nov 2021 20:05:49 +0000 Subject: [PATCH] Optimize memory usage during materialization (#2073) Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> --- sdk/python/feast/infra/passthrough_provider.py | 16 +++++++++++----- sdk/python/feast/infra/provider.py | 6 ++++-- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 06a9d3a8b3..34bf4902b3 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -23,6 +23,8 @@ from feast.repo_config import RepoConfig from feast.usage import RatioSampler, log_exceptions_and_usage, set_usage_attribute +DEFAULT_BATCH_SIZE = 10_000 + class PassthroughProvider(Provider): """ @@ -145,12 +147,16 @@ def materialize_single_feature_view( table = _run_field_mapping(table, feature_view.batch_source.field_mapping) join_keys = [entity.join_key for entity in entities] - rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) - with tqdm_builder(len(rows_to_write)) as pbar: - self.online_write_batch( - self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x) - ) + with tqdm_builder(table.num_rows) as pbar: + for batch in table.to_batches(DEFAULT_BATCH_SIZE): + rows_to_write = _convert_arrow_to_proto(batch, feature_view, join_keys) + self.online_write_batch( + self.repo_config, + feature_view, + rows_to_write, + lambda x: pbar.update(x), + ) def get_historical_features( self, diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 5546984f71..2bf8f6329e 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -285,7 +285,9 @@ def _run_field_mapping( def _convert_arrow_to_proto( - table: pyarrow.Table, feature_view: FeatureView, join_keys: List[str], + table: Union[pyarrow.Table, pyarrow.RecordBatch], + feature_view: FeatureView, + join_keys: List[str], ) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]: rows_to_write = [] @@ -305,7 +307,7 @@ def _coerce_datetime(ts): else: return ts - column_names_idx = {k: i for i, k in enumerate(table.column_names)} + column_names_idx = {field.name: i for i, field in enumerate(table.schema)} for row in zip(*table.to_pydict().values()): entity_key = EntityKeyProto() for join_key in join_keys: