Skip to content

Commit

Permalink
feat: Feast/IKV upgrade client version (#4200)
Browse files Browse the repository at this point in the history
  • Loading branch information
pushkarmoi authored May 14, 2024
1 parent bf99640 commit 0e42150
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ def online_write_batch(
progress: Function to be called once a batch of rows is written to the online store, used
to show progress.
"""
# update should have been called before
if self._writer is None:
return
self._init_writer(config=config)
assert self._writer is not None

for entity_key, features, event_timestamp, _ in data:
entity_id: str = compute_entity_id(
Expand Down Expand Up @@ -120,6 +119,8 @@ def online_read(
item is the event timestamp for the row, and the second item is a dict mapping feature names
to values, which are returned in proto format.
"""
self._init_reader(config=config)

if not len(entity_keys):
return []

Expand Down Expand Up @@ -174,7 +175,6 @@ def _decode_fields_for_primary_key(

return dt, features

# called before any read/write requests are issued
@log_exceptions_and_usage(online_store="ikv")
def update(
self,
Expand All @@ -199,7 +199,7 @@ def update(
partial: If true, tables_to_delete and tables_to_keep are not exhaustive lists, so
infrastructure corresponding to other feature views should be not be touched.
"""
self._init_clients(config=config)
self._init_writer(config=config)
assert self._writer is not None

# note: we assume tables_to_keep does not overlap with tables_to_delete
Expand All @@ -223,7 +223,7 @@ def teardown(
tables: Feature views whose corresponding infrastructure should be deleted.
entities: Entities whose corresponding infrastructure should be deleted.
"""
self._init_clients(config=config)
self._init_writer(config=config)
assert self._writer is not None

# drop fields corresponding to this feature-view
Expand Down Expand Up @@ -269,20 +269,28 @@ def _create_document(

return builder.build()

def _init_clients(self, config: RepoConfig):
"""Initializes (if required) reader/writer ikv clients."""
online_config = config.online_store
assert isinstance(online_config, IKVOnlineStoreConfig)
client_options = IKVOnlineStore._config_to_client_options(online_config)

def _init_writer(self, config: RepoConfig):
"""Initializes ikv writer client."""
# initialize writer
if self._writer is None:
online_config = config.online_store
assert isinstance(online_config, IKVOnlineStoreConfig)
client_options = IKVOnlineStore._config_to_client_options(online_config)

self._writer = create_new_writer(client_options)
self._writer.startup() # blocking operation

# initialize reader, iff mount_dir is specified
def _init_reader(self, config: RepoConfig):
"""Initializes ikv reader client."""
# initialize reader
if self._reader is None:
online_config = config.online_store
assert isinstance(online_config, IKVOnlineStoreConfig)
client_options = IKVOnlineStore._config_to_client_options(online_config)

if online_config.mount_directory and len(online_config.mount_directory) > 0:
self._reader = create_new_reader(client_options)
self._reader.startup() # blocking operation

@staticmethod
def _config_to_client_options(config: IKVOnlineStoreConfig) -> ClientOptions:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
]

IKV_REQUIRED = [
"ikvpy>=0.0.23",
"ikvpy>=0.0.36",
]

HAZELCAST_REQUIRED = [
Expand Down

0 comments on commit 0e42150

Please sign in to comment.