From a79dfb9bb2a68f71c4c0587937cde0183ae23113 Mon Sep 17 00:00:00 2001 From: Thomas Ma Date: Thu, 7 Jul 2022 10:36:50 -0700 Subject: [PATCH] fixed aggregatable field name tests --- eland/etl.py | 7 ++-- tests/common.py | 1 - tests/dataframe/test_datetime_pytest.py | 4 +- tests/dataframe/test_dir_pytest.py | 6 ++- tests/dataframe/test_dtypes_pytest.py | 6 ++- .../test_aggregatables_pytest.py | 39 ++++++++++--------- .../test_metric_source_fields_pytest.py | 4 -- 7 files changed, 34 insertions(+), 33 deletions(-) diff --git a/eland/etl.py b/eland/etl.py index bfc159972..326085bbe 100644 --- a/eland/etl.py +++ b/eland/etl.py @@ -21,7 +21,7 @@ import pandas as pd # type: ignore from elasticsearch import Elasticsearch -from elasticsearch.helpers import parallel_bulk +from opensearchpy.helpers import parallel_bulk from eland import DataFrame from eland.common import DEFAULT_CHUNK_SIZE, PANDAS_VERSION, ensure_es_client @@ -160,7 +160,6 @@ def pandas_to_eland( chunksize = DEFAULT_CHUNK_SIZE mapping = FieldMappings._generate_es_mappings(pd_df, es_type_overrides) - es_client = ensure_es_client(es_client) # If table exists, check if_exists parameter if es_client.indices.exists(index=es_dest_index): @@ -174,7 +173,7 @@ def pandas_to_eland( elif es_if_exists == "replace": es_client.indices.delete(index=es_dest_index) - es_client.indices.create(index=es_dest_index, mappings=mapping["mappings"]) + es_client.indices.create(index=es_dest_index, body={'mappings': mapping["mappings"]}) elif es_if_exists == "append" and es_verify_mapping_compatibility: dest_mapping = es_client.indices.get_mapping(index=es_dest_index)[ @@ -186,7 +185,7 @@ def pandas_to_eland( es_type_overrides=es_type_overrides, ) else: - es_client.indices.create(index=es_dest_index, mappings=mapping["mappings"]) + es_client.indices.create(index=es_dest_index, body={'mappings': mapping["mappings"]}) def action_generator( pd_df: pd.DataFrame, diff --git a/tests/common.py b/tests/common.py index ec56011ab..fe579197b 100644 --- a/tests/common.py +++ b/tests/common.py @@ -56,7 +56,6 @@ _pd_ecommerce["products.created_on"] = _pd_ecommerce["products.created_on"].apply( lambda x: pd.to_datetime(x) ) -_pd_ecommerce.insert(2, "customer_birth_date", None) _pd_ecommerce.index = _pd_ecommerce.index.map(str) # make index 'object' not int # need to unpack the dictionaries in the original df to accommodate OpenSearch client read procedure _pd_ecommerce['geoip.location.lon'] = _pd_ecommerce['geoip.location'].apply(pd.Series)['lon'] diff --git a/tests/dataframe/test_datetime_pytest.py b/tests/dataframe/test_datetime_pytest.py index adc8d80ab..fc7e0f03d 100644 --- a/tests/dataframe/test_datetime_pytest.py +++ b/tests/dataframe/test_datetime_pytest.py @@ -58,11 +58,11 @@ def setup_class(cls): body = {"mappings": mappings} index = "test_time_formats" - es.options(ignore_status=[400, 404]).indices.delete(index=index) + es.indices.delete(index=index, ignore_unavailable=True) es.indices.create(index=index, body=body) for i, time_formats in enumerate(time_formats_docs): - es.index(index=index, id=i, document=time_formats) + es.index(index=index, id=i, body=time_formats) es.indices.refresh(index=index) @classmethod diff --git a/tests/dataframe/test_dir_pytest.py b/tests/dataframe/test_dir_pytest.py index 7f147a3a9..6de0667d0 100644 --- a/tests/dataframe/test_dir_pytest.py +++ b/tests/dataframe/test_dir_pytest.py @@ -30,4 +30,8 @@ def test_flights_dir(self): autocomplete_attrs = dir(ed_flights) for c in ed_flights.columns: - assert c in autocomplete_attrs + # opensearch will save JSON values with sub-entries (like {A: {b: x}}) as A.b + # these are not saved in pandas as attributes as they interfere with the naming convention + # we ignore these for now + if '.' not in c: + assert c in autocomplete_attrs diff --git a/tests/dataframe/test_dtypes_pytest.py b/tests/dataframe/test_dtypes_pytest.py index 68be8d2ef..ae78061f2 100644 --- a/tests/dataframe/test_dtypes_pytest.py +++ b/tests/dataframe/test_dtypes_pytest.py @@ -47,7 +47,8 @@ def test_es_dtypes(self, testdata): "DestAirportID": "keyword", "DestCityName": "keyword", "DestCountry": "keyword", - "DestLocation": "geo_point", + "DestLocation.lat": "float", + "DestLocation.lon": "float", "DestRegion": "keyword", "DestWeather": "keyword", "DistanceKilometers": "float", @@ -62,7 +63,8 @@ def test_es_dtypes(self, testdata): "OriginAirportID": "keyword", "OriginCityName": "keyword", "OriginCountry": "keyword", - "OriginLocation": "geo_point", + "OriginLocation.lat": "float", + "OriginLocation.lon": "float", "OriginRegion": "keyword", "OriginWeather": "keyword", "dayOfWeek": "byte", diff --git a/tests/field_mappings/test_aggregatables_pytest.py b/tests/field_mappings/test_aggregatables_pytest.py index 9cd1fa80a..42cbccfea 100644 --- a/tests/field_mappings/test_aggregatables_pytest.py +++ b/tests/field_mappings/test_aggregatables_pytest.py @@ -34,21 +34,22 @@ def test_ecommerce_all_aggregatables(self): expected = { "category.keyword": "category", - "currency": "currency", - "customer_birth_date": "customer_birth_date", + 'currency.keyword': "currency", "customer_first_name.keyword": "customer_first_name", "customer_full_name.keyword": "customer_full_name", + 'customer_gender.keyword': "customer_gender", "customer_id": "customer_id", "customer_last_name.keyword": "customer_last_name", - "customer_phone": "customer_phone", - "day_of_week": "day_of_week", + "customer_phone.keyword": "customer_phone", + "day_of_week.keyword": "day_of_week", "day_of_week_i": "day_of_week_i", - "email": "email", - "geoip.city_name": "geoip.city_name", - "geoip.continent_name": "geoip.continent_name", - "geoip.country_iso_code": "geoip.country_iso_code", - "geoip.location": "geoip.location", - "geoip.region_name": "geoip.region_name", + "email.keyword": "email", + "geoip.city_name.keyword": "geoip.city_name", + "geoip.continent_name.keyword": "geoip.continent_name", + "geoip.country_iso_code.keyword": "geoip.country_iso_code", + "geoip.location.lat": "geoip.location.lat", + "geoip.location.lon": "geoip.location.lon", + "geoip.region_name.keyword": "geoip.region_name", "manufacturer.keyword": "manufacturer", "order_date": "order_date", "order_id": "order_id", @@ -65,18 +66,18 @@ def test_ecommerce_all_aggregatables(self): "products.product_id": "products.product_id", "products.product_name.keyword": "products.product_name", "products.quantity": "products.quantity", - "products.sku": "products.sku", + "products.sku.keyword": "products.sku", "products.tax_amount": "products.tax_amount", "products.taxful_price": "products.taxful_price", "products.taxless_price": "products.taxless_price", "products.unit_discount_amount": "products.unit_discount_amount", - "sku": "sku", + "sku.keyword": "sku", "taxful_total_price": "taxful_total_price", "taxless_total_price": "taxless_total_price", "total_quantity": "total_quantity", "total_unique_products": "total_unique_products", - "type": "type", - "user": "user", + "type.keyword": "type", + "user.keyword": "user", } assert expected == aggregatables @@ -84,11 +85,10 @@ def test_ecommerce_all_aggregatables(self): def test_ecommerce_selected_aggregatables(self): expected = { "category.keyword": "category", - "currency": "currency", - "customer_birth_date": "customer_birth_date", + "currency.keyword": "currency", "customer_first_name.keyword": "customer_first_name", - "type": "type", - "user": "user", + "type.keyword": "type", + "user.keyword": "user", } ed_field_mappings = FieldMappings( @@ -106,7 +106,7 @@ def test_ecommerce_single_aggregatable_field(self): client=ES_TEST_CLIENT, index_pattern=ECOMMERCE_INDEX_NAME ) - assert "user" == ed_field_mappings.aggregatable_field_name("user") + assert "user.keyword" == ed_field_mappings.aggregatable_field_name("user") def test_ecommerce_single_keyword_aggregatable_field(self): ed_field_mappings = FieldMappings( @@ -126,6 +126,7 @@ def test_ecommerce_single_non_existant_field(self): with pytest.raises(KeyError): ed_field_mappings.aggregatable_field_name("non_existant") + @pytest.mark.skip(reason="opensearch treats all fields in ecommerce df as aggregatable") @pytest.mark.filterwarnings("ignore:Aggregations not supported") def test_ecommerce_single_non_aggregatable_field(self): ed_field_mappings = FieldMappings( diff --git a/tests/field_mappings/test_metric_source_fields_pytest.py b/tests/field_mappings/test_metric_source_fields_pytest.py index 895107958..b07e9e579 100644 --- a/tests/field_mappings/test_metric_source_fields_pytest.py +++ b/tests/field_mappings/test_metric_source_fields_pytest.py @@ -77,7 +77,6 @@ def test_ecommerce_selected_non_metric_source_fields(self): field_names = [ "category", "currency", - "customer_birth_date", "customer_first_name", "user", ] @@ -85,7 +84,6 @@ def test_ecommerce_selected_non_metric_source_fields(self): Note: non of there are metric category object currency object - customer_birth_date datetime64[ns] customer_first_name object user object """ @@ -108,7 +106,6 @@ def test_ecommerce_selected_mixed_metric_source_fields(self): field_names = [ "category", "currency", - "customer_birth_date", "customer_first_name", "total_quantity", "user", @@ -117,7 +114,6 @@ def test_ecommerce_selected_mixed_metric_source_fields(self): Note: one is metric category object currency object - customer_birth_date datetime64[ns] customer_first_name object total_quantity int64 user object