Skip to content

Commit

Permalink
Merge pull request #2 from LEFTA98/dimension-fixes
Browse files Browse the repository at this point in the history
fixed aggregatable field name tests
  • Loading branch information
LEFTA98 authored Jul 7, 2022
2 parents 38a0ec9 + a79dfb9 commit 0b89f1f
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 33 deletions.
7 changes: 3 additions & 4 deletions eland/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import pandas as pd # type: ignore
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk
from opensearchpy.helpers import parallel_bulk

from eland import DataFrame
from eland.common import DEFAULT_CHUNK_SIZE, PANDAS_VERSION, ensure_es_client
Expand Down Expand Up @@ -160,7 +160,6 @@ def pandas_to_eland(
chunksize = DEFAULT_CHUNK_SIZE

mapping = FieldMappings._generate_es_mappings(pd_df, es_type_overrides)
es_client = ensure_es_client(es_client)

# If table exists, check if_exists parameter
if es_client.indices.exists(index=es_dest_index):
Expand All @@ -174,7 +173,7 @@ def pandas_to_eland(

elif es_if_exists == "replace":
es_client.indices.delete(index=es_dest_index)
es_client.indices.create(index=es_dest_index, mappings=mapping["mappings"])
es_client.indices.create(index=es_dest_index, body={'mappings': mapping["mappings"]})

elif es_if_exists == "append" and es_verify_mapping_compatibility:
dest_mapping = es_client.indices.get_mapping(index=es_dest_index)[
Expand All @@ -186,7 +185,7 @@ def pandas_to_eland(
es_type_overrides=es_type_overrides,
)
else:
es_client.indices.create(index=es_dest_index, mappings=mapping["mappings"])
es_client.indices.create(index=es_dest_index, body={'mappings': mapping["mappings"]})

def action_generator(
pd_df: pd.DataFrame,
Expand Down
1 change: 0 additions & 1 deletion tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
_pd_ecommerce["products.created_on"] = _pd_ecommerce["products.created_on"].apply(
lambda x: pd.to_datetime(x)
)
_pd_ecommerce.insert(2, "customer_birth_date", None)
_pd_ecommerce.index = _pd_ecommerce.index.map(str) # make index 'object' not int
# need to unpack the dictionaries in the original df to accommodate OpenSearch client read procedure
_pd_ecommerce['geoip.location.lon'] = _pd_ecommerce['geoip.location'].apply(pd.Series)['lon']
Expand Down
4 changes: 2 additions & 2 deletions tests/dataframe/test_datetime_pytest.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ def setup_class(cls):

body = {"mappings": mappings}
index = "test_time_formats"
es.options(ignore_status=[400, 404]).indices.delete(index=index)
es.indices.delete(index=index, ignore_unavailable=True)
es.indices.create(index=index, body=body)

for i, time_formats in enumerate(time_formats_docs):
es.index(index=index, id=i, document=time_formats)
es.index(index=index, id=i, body=time_formats)
es.indices.refresh(index=index)

@classmethod
Expand Down
6 changes: 5 additions & 1 deletion tests/dataframe/test_dir_pytest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,8 @@ def test_flights_dir(self):
autocomplete_attrs = dir(ed_flights)

for c in ed_flights.columns:
assert c in autocomplete_attrs
# opensearch will save JSON values with sub-entries (like {A: {b: x}}) as A.b
# these are not saved in pandas as attributes as they interfere with the naming convention
# we ignore these for now
if '.' not in c:
assert c in autocomplete_attrs
6 changes: 4 additions & 2 deletions tests/dataframe/test_dtypes_pytest.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ def test_es_dtypes(self, testdata):
"DestAirportID": "keyword",
"DestCityName": "keyword",
"DestCountry": "keyword",
"DestLocation": "geo_point",
"DestLocation.lat": "float",
"DestLocation.lon": "float",
"DestRegion": "keyword",
"DestWeather": "keyword",
"DistanceKilometers": "float",
Expand All @@ -62,7 +63,8 @@ def test_es_dtypes(self, testdata):
"OriginAirportID": "keyword",
"OriginCityName": "keyword",
"OriginCountry": "keyword",
"OriginLocation": "geo_point",
"OriginLocation.lat": "float",
"OriginLocation.lon": "float",
"OriginRegion": "keyword",
"OriginWeather": "keyword",
"dayOfWeek": "byte",
Expand Down
39 changes: 20 additions & 19 deletions tests/field_mappings/test_aggregatables_pytest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,22 @@ def test_ecommerce_all_aggregatables(self):

expected = {
"category.keyword": "category",
"currency": "currency",
"customer_birth_date": "customer_birth_date",
'currency.keyword': "currency",
"customer_first_name.keyword": "customer_first_name",
"customer_full_name.keyword": "customer_full_name",
'customer_gender.keyword': "customer_gender",
"customer_id": "customer_id",
"customer_last_name.keyword": "customer_last_name",
"customer_phone": "customer_phone",
"day_of_week": "day_of_week",
"customer_phone.keyword": "customer_phone",
"day_of_week.keyword": "day_of_week",
"day_of_week_i": "day_of_week_i",
"email": "email",
"geoip.city_name": "geoip.city_name",
"geoip.continent_name": "geoip.continent_name",
"geoip.country_iso_code": "geoip.country_iso_code",
"geoip.location": "geoip.location",
"geoip.region_name": "geoip.region_name",
"email.keyword": "email",
"geoip.city_name.keyword": "geoip.city_name",
"geoip.continent_name.keyword": "geoip.continent_name",
"geoip.country_iso_code.keyword": "geoip.country_iso_code",
"geoip.location.lat": "geoip.location.lat",
"geoip.location.lon": "geoip.location.lon",
"geoip.region_name.keyword": "geoip.region_name",
"manufacturer.keyword": "manufacturer",
"order_date": "order_date",
"order_id": "order_id",
Expand All @@ -65,30 +66,29 @@ def test_ecommerce_all_aggregatables(self):
"products.product_id": "products.product_id",
"products.product_name.keyword": "products.product_name",
"products.quantity": "products.quantity",
"products.sku": "products.sku",
"products.sku.keyword": "products.sku",
"products.tax_amount": "products.tax_amount",
"products.taxful_price": "products.taxful_price",
"products.taxless_price": "products.taxless_price",
"products.unit_discount_amount": "products.unit_discount_amount",
"sku": "sku",
"sku.keyword": "sku",
"taxful_total_price": "taxful_total_price",
"taxless_total_price": "taxless_total_price",
"total_quantity": "total_quantity",
"total_unique_products": "total_unique_products",
"type": "type",
"user": "user",
"type.keyword": "type",
"user.keyword": "user",
}

assert expected == aggregatables

def test_ecommerce_selected_aggregatables(self):
expected = {
"category.keyword": "category",
"currency": "currency",
"customer_birth_date": "customer_birth_date",
"currency.keyword": "currency",
"customer_first_name.keyword": "customer_first_name",
"type": "type",
"user": "user",
"type.keyword": "type",
"user.keyword": "user",
}

ed_field_mappings = FieldMappings(
Expand All @@ -106,7 +106,7 @@ def test_ecommerce_single_aggregatable_field(self):
client=ES_TEST_CLIENT, index_pattern=ECOMMERCE_INDEX_NAME
)

assert "user" == ed_field_mappings.aggregatable_field_name("user")
assert "user.keyword" == ed_field_mappings.aggregatable_field_name("user")

def test_ecommerce_single_keyword_aggregatable_field(self):
ed_field_mappings = FieldMappings(
Expand All @@ -126,6 +126,7 @@ def test_ecommerce_single_non_existant_field(self):
with pytest.raises(KeyError):
ed_field_mappings.aggregatable_field_name("non_existant")

@pytest.mark.skip(reason="opensearch treats all fields in ecommerce df as aggregatable")
@pytest.mark.filterwarnings("ignore:Aggregations not supported")
def test_ecommerce_single_non_aggregatable_field(self):
ed_field_mappings = FieldMappings(
Expand Down
4 changes: 0 additions & 4 deletions tests/field_mappings/test_metric_source_fields_pytest.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,13 @@ def test_ecommerce_selected_non_metric_source_fields(self):
field_names = [
"category",
"currency",
"customer_birth_date",
"customer_first_name",
"user",
]
"""
Note: non of there are metric
category object
currency object
customer_birth_date datetime64[ns]
customer_first_name object
user object
"""
Expand All @@ -108,7 +106,6 @@ def test_ecommerce_selected_mixed_metric_source_fields(self):
field_names = [
"category",
"currency",
"customer_birth_date",
"customer_first_name",
"total_quantity",
"user",
Expand All @@ -117,7 +114,6 @@ def test_ecommerce_selected_mixed_metric_source_fields(self):
Note: one is metric
category object
currency object
customer_birth_date datetime64[ns]
customer_first_name object
total_quantity int64
user object
Expand Down

0 comments on commit 0b89f1f

Please sign in to comment.