Skip to content

Commit

Permalink
fix: Hydrate infra object in the sql registry proto() method (feast-d…
Browse files Browse the repository at this point in the history
…ev#2782)

* fix: Implement apply_materialziation and infra methods in sql registry

Signed-off-by: Achal Shah <achals@gmail.com>

* fix: hydrate infra object in the sql registry proto() method

Signed-off-by: Achal Shah <achals@gmail.com>

* rm old comment

Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals authored Jun 10, 2022
1 parent 331a214 commit 452dcd3
Showing 1 changed file with 53 additions and 1 deletion.
54 changes: 53 additions & 1 deletion sdk/python/feast/infra/registry_stores/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@
)


feast_metadata = Table(
"feast_metadata",
metadata,
Column("metadata_key", String(50), primary_key=True),
Column("metadata_value", String(50), nullable=False),
Column("last_updated_timestamp", BigInteger, nullable=False),
)


class SqlRegistry(BaseRegistry):
def __init__(
self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
Expand Down Expand Up @@ -575,7 +584,6 @@ def get_user_metadata(
def proto(self) -> RegistryProto:
r = RegistryProto()
project = ""
# TODO(achal): Support Infra object, and last_updated_timestamp.
for lister, registry_proto_field in [
(self.list_entities, r.entities),
(self.list_feature_views, r.feature_views),
Expand All @@ -591,6 +599,11 @@ def proto(self) -> RegistryProto:
if objs:
registry_proto_field.extend([obj.to_proto() for obj in objs])

r.infra.CopyFrom(self.get_infra(project).to_proto())
last_updated_timestamp = self._get_last_updated_metadata()
if last_updated_timestamp:
r.last_updated.FromDatetime(last_updated_timestamp)

return r

def commit(self):
Expand Down Expand Up @@ -626,13 +639,15 @@ def _apply_object(self, table, id_field_name, obj, proto_field_name, name=None):
}
insert_stmt = insert(table).values(values,)
conn.execute(insert_stmt)
self._set_last_updated_metadata(update_datetime)

def _delete_object(self, table, name, project, id_field_name, not_found_exception):
with self.engine.connect() as conn:
stmt = delete(table).where(getattr(table.c, id_field_name) == name)
rows = conn.execute(stmt)
if rows.rowcount < 1 and not_found_exception:
raise not_found_exception(name, project)
self._set_last_updated_metadata(datetime.utcnow())
return rows.rowcount

def _get_object(
Expand Down Expand Up @@ -666,3 +681,40 @@ def _list_objects(self, table, proto_class, python_class, proto_field_name):
for row in rows
]
return []

def _set_last_updated_metadata(self, last_updated: datetime):
with self.engine.connect() as conn:
stmt = select(feast_metadata).where(
feast_metadata.c.metadata_key == "last_updated_timestamp"
)
row = conn.execute(stmt).first()

update_time = int(last_updated.timestamp())

values = {
"metadata_key": "last_updated_timestamp",
"metadata_value": f"{update_time}",
"last_updated_timestamp": update_time,
}
if row:
update_stmt = (
update(feast_metadata)
.where(feast_metadata.c.metadata_key == "last_updated_timestamp")
.values(values)
)
conn.execute(update_stmt)
else:
insert_stmt = insert(feast_metadata).values(values,)
conn.execute(insert_stmt)

def _get_last_updated_metadata(self):
with self.engine.connect() as conn:
stmt = select(feast_metadata).where(
feast_metadata.c.metadata_key == "last_updated_timestamp"
)
row = conn.execute(stmt).first()
if not row:
return None
update_time = int(row["last_updated_timestamp"])

return datetime.utcfromtimestamp(update_time)

0 comments on commit 452dcd3

Please sign in to comment.