Skip to content

Commit

Permalink
chore: Clean up entity key serialization (feast-dev#3199)
Browse files Browse the repository at this point in the history
* Clean up entity key serialization

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Remove incorrect docs

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Format

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

Signed-off-by: Felix Wang <wangfelix98@gmail.com>
  • Loading branch information
felixwang9817 authored Sep 8, 2022
1 parent 2535024 commit 782c759
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ def online_write_batch(
project = config.project

for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(entity_key).hex()
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
timestamp = _to_naive_utc(timestamp)
if created_ts is not None:
created_ts = _to_naive_utc(created_ts)
Expand Down Expand Up @@ -184,7 +187,10 @@ def online_read(

project = config.project
for entity_key in entity_keys:
entity_key_bin = serialize_entity_key(entity_key).hex()
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
print(f"entity_key_bin: {entity_key_bin}")

cur.execute(
Expand All @@ -208,18 +214,6 @@ def online_read(
```
{% endcode %}

### 1.3 Type Mapping

Most online stores will have to perform some custom mapping of online store datatypes to feast value types.

* The function to implement here are `source_datatype_to_feast_value_type` and `get_column_names_and_types` in your `DataSource` class.
* `source_datatype_to_feast_value_type` is used to convert your DataSource's datatypes to feast value types.
* `get_column_names_and_types` retrieves the column names and corresponding datasource types.

Add any helper functions for type conversion to `sdk/python/feast/type_map.py`.

* Be sure to implement correct type mapping so that Feast can process your feature columns without casting incorrectly that can potentially cause loss of information or incorrect data.

## 2. Defining an OnlineStoreConfig class

Additional configuration may be needed to allow the OnlineStore to talk to the backing store. For example, MySQL may need configuration information like the host at which the MySQL instance is running, credentials for connecting to the database, etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ def online_write_batch(
project = config.project
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(
entity_key, entity_key_serialization_version=2
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
with tracing_span(name="remote_call"):
self._write_rows(
Expand Down Expand Up @@ -353,7 +354,8 @@ def online_read(

for entity_key in entity_keys:
entity_key_bin = serialize_entity_key(
entity_key, entity_key_serialization_version=2
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()

with tracing_span(name="remote_call"):
Expand Down
18 changes: 14 additions & 4 deletions sdk/python/feast/infra/online_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,13 @@ def online_write_batch(

for j, (feature_name, val) in enumerate(values.items()):
df.loc[j, "entity_feature_key"] = serialize_entity_key(
entity_key, 2
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
) + bytes(feature_name, encoding="utf-8")
df.loc[j, "entity_key"] = serialize_entity_key(entity_key, 2)
df.loc[j, "entity_key"] = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
df.loc[j, "feature_name"] = feature_name
df.loc[j, "value"] = val.SerializeToString()
df.loc[j, "event_ts"] = timestamp
Expand Down Expand Up @@ -162,7 +166,10 @@ def online_read(
(
"TO_BINARY("
+ hexlify(
serialize_entity_key(combo[0], 2)
serialize_entity_key(
combo[0],
entity_key_serialization_version=config.entity_key_serialization_version,
)
+ bytes(combo[1], encoding="utf-8")
).__str__()[1:]
+ ")"
Expand All @@ -184,7 +191,10 @@ def online_read(
df = execute_snowflake_statement(conn, query).fetch_pandas_all()

for entity_key in entity_keys:
entity_key_bin = serialize_entity_key(entity_key, 2)
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
res = {}
res_ts = None
for index, row in df[df["entity_key"] == entity_key_bin].iterrows():
Expand Down

0 comments on commit 782c759

Please sign in to comment.