Skip to content

Commit

Permalink
feat: Pull duckdb from contribs, add to CI (#4059)
Browse files Browse the repository at this point in the history
* pull duckdb/ibis from contrib folder

Signed-off-by: tokoko <togurg14@freeuni.edu.ge>

* fix linter failures

Signed-off-by: tokoko <togurg14@freeuni.edu.ge>

---------

Signed-off-by: tokoko <togurg14@freeuni.edu.ge>
  • Loading branch information
tokoko authored Apr 8, 2024
1 parent 1ba65b4 commit 318a2b8
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 52 deletions.
12 changes: 0 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -176,18 +176,6 @@ test-python-universal-athena:
not s3_registry and \
not test_snowflake" \
sdk/python/tests

test-python-universal-duckdb:
PYTHONPATH='.' \
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.duckdb_repo_configuration \
python -m pytest -n 8 --integration \
-k "not test_nullable_online_store and \
not gcs_registry and \
not s3_registry and \
not test_snowflake and \
not bigquery and \
not test_spark_materialization_consistency" \
sdk/python/tests

test-python-universal-postgres-offline:
PYTHONPATH='.' \
Expand Down
2 changes: 1 addition & 1 deletion docs/project/development-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ make lint-python
### Unit Tests
Unit tests (`pytest`) for the Feast Python SDK / CLI can run as follows:
```sh
make test-python
make test-python-unit
```

> :warning: Local configuration can interfere with Unit tests and cause them to fail:
Expand Down
2 changes: 1 addition & 1 deletion environment-setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ make install-python-ci-dependencies PYTHON=3.9
4. start the docker daemon
5. run unit tests:
```bash
make test-python
make test-python-unit
```
Empty file.

This file was deleted.

Empty file.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ibis
from pydantic import StrictStr

from feast.infra.offline_stores.contrib.ibis_offline_store.ibis import IbisOfflineStore
from feast.infra.offline_stores.ibis import IbisOfflineStore
from feast.repo_config import FeastConfigBaseModel


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ def get_historical_features(
entity_table, feature_views, event_timestamp_col
)

def read_fv(feature_view, feature_refs, full_feature_names):
def read_fv(
feature_view: FeatureView, feature_refs: List[str], full_feature_names: bool
) -> Tuple:
fv_table: Table = ibis.read_parquet(feature_view.batch_source.name)

for old_name, new_name in feature_view.batch_source.field_mapping.items():
Expand Down Expand Up @@ -175,6 +177,7 @@ def read_fv(feature_view, feature_refs, full_feature_names):
return (
fv_table,
feature_view.batch_source.timestamp_field,
feature_view.batch_source.created_timestamp_column,
feature_view.projection.join_key_map
or {e.name: e.name for e in feature_view.entity_columns},
feature_refs,
Expand Down Expand Up @@ -351,12 +354,19 @@ def metadata(self) -> Optional[RetrievalMetadata]:

def point_in_time_join(
entity_table: Table,
feature_tables: List[Tuple[Table, str, Dict[str, str], List[str], timedelta]],
feature_tables: List[Tuple[Table, str, str, Dict[str, str], List[str], timedelta]],
event_timestamp_col="event_timestamp",
):
# TODO handle ttl
all_entities = [event_timestamp_col]
for feature_table, timestamp_field, join_key_map, _, _ in feature_tables:
for (
feature_table,
timestamp_field,
created_timestamp_field,
join_key_map,
_,
_,
) in feature_tables:
all_entities.extend(join_key_map.values())

r = ibis.literal("")
Expand All @@ -371,6 +381,7 @@ def point_in_time_join(
for (
feature_table,
timestamp_field,
created_timestamp_field,
join_key_map,
feature_refs,
ttl,
Expand All @@ -395,9 +406,13 @@ def point_in_time_join(

feature_table = feature_table.drop(s.endswith("_y"))

order_by_fields = [ibis.desc(feature_table[timestamp_field])]
if created_timestamp_field:
order_by_fields.append(ibis.desc(feature_table[created_timestamp_field]))

feature_table = (
feature_table.group_by(by="entity_row_id")
.order_by(ibis.desc(feature_table[timestamp_field]))
.order_by(order_by_fields)
.mutate(rn=ibis.row_number())
)

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
"postgres": "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.PostgreSQLOfflineStore",
"athena": "feast.infra.offline_stores.contrib.athena_offline_store.athena.AthenaOfflineStore",
"mssql": "feast.infra.offline_stores.contrib.mssql_offline_store.mssql.MsSqlServerOfflineStore",
"duckdb": "feast.infra.offline_stores.contrib.duckdb_offline_store.duckdb.DuckDBOfflineStore",
"duckdb": "feast.infra.offline_stores.duckdb.DuckDBOfflineStore",
}

FEATURE_SERVER_CONFIG_CLASS_FOR_TYPE = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
BigQueryDataSourceCreator,
)
from tests.integration.feature_repos.universal.data_sources.file import (
DuckDBDataSourceCreator,
FileDataSourceCreator,
)
from tests.integration.feature_repos.universal.data_sources.redshift import (
Expand Down Expand Up @@ -108,6 +109,7 @@

AVAILABLE_OFFLINE_STORES: List[Tuple[str, Type[DataSourceCreator]]] = [
("local", FileDataSourceCreator),
("local", DuckDBDataSourceCreator),
]

AVAILABLE_ONLINE_STORES: Dict[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from feast.data_format import ParquetFormat
from feast.data_source import DataSource
from feast.feature_logging import LoggingDestination
from feast.infra.offline_stores.duckdb import DuckDBOfflineStoreConfig
from feast.infra.offline_stores.file import FileOfflineStoreConfig
from feast.infra.offline_stores.file_source import (
FileLoggingDestination,
Expand Down Expand Up @@ -214,3 +215,10 @@ def create_offline_store_config(self) -> FeastConfigBaseModel:
def teardown(self):
self.minio.stop()
self.f.close()


# TODO split up DataSourceCreator and OfflineStoreCreator
class DuckDBDataSourceCreator(FileDataSourceCreator):
def create_offline_store_config(self):
self.duckdb_offline_store_config = DuckDBOfflineStoreConfig()
return self.duckdb_offline_store_config
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ def test_historical_features_with_no_ttl(

@pytest.mark.integration
@pytest.mark.universal_offline_stores
def test_historical_features_from_bigquery_sources_containing_backfills(environment):
def test_historical_features_containing_backfills(environment):
store = environment.feature_store

now = datetime.now().replace(microsecond=0, second=0, minute=0)
Expand Down
58 changes: 46 additions & 12 deletions sdk/python/tests/unit/infra/offline_stores/test_ibis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@

import ibis
import pyarrow as pa
import pyarrow.compute as pc

from feast.infra.offline_stores.contrib.ibis_offline_store.ibis import (
point_in_time_join,
)
from feast.infra.offline_stores.ibis import point_in_time_join


def pa_datetime(year, month, day):
Expand All @@ -16,12 +15,13 @@ def pa_datetime(year, month, day):
def customer_table():
return pa.Table.from_arrays(
arrays=[
pa.array([1, 1, 2]),
pa.array([1, 1, 2, 3]),
pa.array(
[
pa_datetime(2024, 1, 1),
pa_datetime(2024, 1, 2),
pa_datetime(2024, 1, 1),
pa_datetime(2024, 1, 3),
]
),
],
Expand All @@ -32,24 +32,38 @@ def customer_table():
def features_table_1():
return pa.Table.from_arrays(
arrays=[
pa.array([1, 1, 1, 2]),
pa.array([1, 1, 1, 2, 3, 3]),
pa.array(
[
pa_datetime(2023, 12, 31),
pa_datetime(2024, 1, 2),
pa_datetime(2024, 1, 3),
pa_datetime(2023, 1, 3),
pa_datetime(2024, 1, 1),
pa_datetime(2024, 1, 1),
]
),
pa.array([11, 22, 33, 22]),
pa.array(
[
pa_datetime(2023, 12, 31),
pa_datetime(2024, 1, 2),
pa_datetime(2024, 1, 3),
pa_datetime(2023, 1, 3),
pa_datetime(2024, 1, 3),
pa_datetime(2024, 1, 2),
]
),
pa.array([11, 22, 33, 22, 10, 20]),
],
names=["customer_id", "event_timestamp", "feature1"],
names=["customer_id", "event_timestamp", "created", "feature1"],
)


def point_in_time_join_brute(
entity_table: pa.Table,
feature_tables: List[Tuple[pa.Table, str, Dict[str, str], List[str], timedelta]],
feature_tables: List[
Tuple[pa.Table, str, str, Dict[str, str], List[str], timedelta]
],
event_timestamp_col="event_timestamp",
):
ret_fields = [entity_table.schema.field(n) for n in entity_table.schema.names]
Expand All @@ -63,6 +77,7 @@ def point_in_time_join_brute(
for (
feature_table,
timestamp_key,
created_timestamp_key,
join_key_map,
feature_refs,
ttl,
Expand All @@ -72,7 +87,9 @@ def point_in_time_join_brute(
[
feature_table.schema.field(f)
for f in feature_table.schema.names
if f not in join_key_map.values() and f != timestamp_key
if f not in join_key_map.values()
and f != timestamp_key
and f != created_timestamp_key
]
)

Expand All @@ -82,9 +99,11 @@ def check_equality(ft_dict, batch_dict, x, y):
)

ft_dict = feature_table.to_pydict()

found_matches = [
(j, ft_dict[timestamp_key][j])
for j in range(entity_table.num_rows)
(j, (ft_dict[timestamp_key][j], ft_dict[created_timestamp_key][j]))
# (j, ft_dict[timestamp_key][j])
for j in range(feature_table.num_rows)
if check_equality(ft_dict, batch_dict, j, i)
and ft_dict[timestamp_key][j] <= row_timestmap
and ft_dict[timestamp_key][j] >= row_timestmap - ttl
Expand All @@ -93,6 +112,7 @@ def check_equality(ft_dict, batch_dict, x, y):
index_found = (
max(found_matches, key=itemgetter(1))[0] if found_matches else None
)

for col in ft_dict.keys():
if col not in feature_refs:
continue
Expand All @@ -108,13 +128,26 @@ def check_equality(ft_dict, batch_dict, x, y):
return pa.Table.from_pydict(ret, schema=pa.schema(ret_fields))


def tables_equal_ignore_order(actual: pa.Table, expected: pa.Table):
sort_keys = [(name, "ascending") for name in actual.column_names]
sort_indices = pc.sort_indices(actual, sort_keys)
actual = pc.take(actual, sort_indices)

sort_keys = [(name, "ascending") for name in expected.column_names]
sort_indices = pc.sort_indices(expected, sort_keys)
expected = pc.take(expected, sort_indices)

return actual.equals(expected)


def test_point_in_time_join():
expected = point_in_time_join_brute(
customer_table(),
feature_tables=[
(
features_table_1(),
"event_timestamp",
"created",
{"customer_id": "customer_id"},
["feature1"],
timedelta(days=10),
Expand All @@ -128,11 +161,12 @@ def test_point_in_time_join():
(
ibis.memtable(features_table_1()),
"event_timestamp",
"created",
{"customer_id": "customer_id"},
["feature1"],
timedelta(days=10),
)
],
).to_pyarrow()

assert actual.equals(expected)
assert tables_equal_ignore_order(actual, expected)

0 comments on commit 318a2b8

Please sign in to comment.