Skip to content

Commit

Permalink
fix(server/pandas/filter): Fix datetime object comparisons
Browse files Browse the repository at this point in the history
relates to TCTC-4268

Signed-off-by: Luka Peschke <luka.peschke@toucantoco.com>
  • Loading branch information
lukapeschke committed Oct 11, 2022
1 parent 2c57501 commit eaef42d
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 8 deletions.
1 change: 1 addition & 0 deletions server/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- PyPika: Postgres and Redshift translator now use `TO_TIMESTAMP` rather than `TIMESTAMP` for the `todate` step
- PyPika: The `text` step now does an explicit cast of the input text to the adequate text type
- PyPika: Google Big Query translator now use `PARSE_DATETIME` rather than `TIMESTAMP` for the `todate` step
- Pandas: Fixed the "filter" condition evaluation when operating with naive datetime objects

## [0.25.4] - 2022-10-06

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from datetime import datetime

from numpy.ma import logical_and, logical_or
from pandas import DataFrame, Series
from pandas import DataFrame, Series, Timestamp
from pandas import to_datetime as pd_to_datetime
from pandas.tseries.offsets import DateOffset

from weaverbird.backends.pandas_executor.steps.utils.dates import evaluate_relative_date
Expand All @@ -16,6 +19,18 @@
from weaverbird.pipeline.dates import RelativeDate


def _date_bound_condition_to_tz_aware_timestamp(value: str | RelativeDate | datetime) -> Timestamp:
if isinstance(value, RelativeDate):
value = evaluate_relative_date(value)
if isinstance(value, datetime):
tz = value.tzinfo or "UTC"
# Cannot pass a tz-aware datetime object with tz arg
return Timestamp(value.replace(tzinfo=None), tz=tz)
else: # str
ts = Timestamp(value)
return ts if ts.tzinfo else ts.replace(tz="UTC")


def apply_condition(condition: Condition, df: DataFrame) -> Series:
"""
Returns a boolean Series, that will be used to filter a DataFrame.
Expand Down Expand Up @@ -55,12 +70,11 @@ def apply_condition(condition: Condition, df: DataFrame) -> Series:
else:
raise NotImplementedError

value = condition.value
if isinstance(value, RelativeDate):
value = evaluate_relative_date(value)
value = _date_bound_condition_to_tz_aware_timestamp(condition.value)

# Remove time info from the column to filter on
column_without_time = df[condition.column] - DateOffset(
# Remove time info from the column to filter on. Using utc=True to ensure we have time-aware
# objects. Since we're only using this for comparison, the input column is not modified,
column_without_time = pd_to_datetime(df[condition.column], utc=True) - DateOffset(
hour=0, minute=0, second=0, microsecond=0, nanosecond=0
)
# Do the same with the value to compare it to
Expand Down
72 changes: 72 additions & 0 deletions server/tests/steps/fixtures/sales_df.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
{
"schema":{
"fields":[
{
"name":"Transaction_date",
"type":"datetime",
"tz":"UTC"
},
{
"name":"Product",
"type":"string"
},
{
"name":"Price",
"type":"integer"
}
],
"pandas_version":"1.4.0"
},
"data":[
{
"Transaction_date":"2009-01-02T06:17:00.000Z",
"Product":"Product1",
"Price":1200
},
{
"Transaction_date":"2009-01-02T04:53:00.000Z",
"Product":"Product1",
"Price":1200
},
{
"Transaction_date":"2009-01-02T13:08:00.000Z",
"Product":"Product1",
"Price":1200
},
{
"Transaction_date":"2009-01-03T14:44:00.000Z",
"Product":"Product1",
"Price":1200
},
{
"Transaction_date":"2009-01-04T12:56:00.000Z",
"Product":"Product2",
"Price":3600
},
{
"Transaction_date":"2009-01-04T13:19:00.000Z",
"Product":"Product1",
"Price":1200
},
{
"Transaction_date":"2009-01-04T20:11:00.000Z",
"Product":"Product1",
"Price":1200
},
{
"Transaction_date":"2009-01-02T20:09:00.000Z",
"Product":"Product1",
"Price":1200
},
{
"Transaction_date":"2009-01-04T13:17:00.000Z",
"Product":"Product1",
"Price":1200
},
{
"Transaction_date":"2009-01-04T14:11:00.000Z",
"Product":"Product1",
"Price":1200
}
]
}
109 changes: 107 additions & 2 deletions server/tests/steps/test_filter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
import json
from datetime import datetime
from os.path import dirname
from os.path import join as path_join
from zoneinfo import ZoneInfo

import pytest
from pandas import DataFrame
from pandas import DataFrame, read_json
from pandas.testing import assert_frame_equal

from tests.utils import assert_dataframes_equals
from weaverbird.backends.pandas_executor.steps.filter import execute_filter
from weaverbird.pipeline.conditions import ComparisonCondition
from weaverbird.pipeline.conditions import (
ComparisonCondition,
ConditionComboAnd,
DateBoundCondition,
)
from weaverbird.pipeline.steps import FilterStep


Expand Down Expand Up @@ -278,3 +289,97 @@ def test_benchmark_filter(benchmark):
step = FilterStep(name="filter", condition={"column": "value", "operator": "lt", "value": 20})
result = benchmark(execute_filter, step, big_df)
assert len(result) == 20


@pytest.fixture
def date_df() -> DataFrame:
with open(path_join(dirname(__file__), "fixtures", "sales_df.json"), "r") as fd:
return read_json(fd, orient="table")


@pytest.fixture
def expected_date_filter_result() -> DataFrame:
return read_json(
json.dumps(
{
"schema": {
"fields": [
{"name": "Transaction_date", "type": "datetime", "tz": "UTC"},
{"name": "Product", "type": "string"},
{"name": "Price", "type": "integer"},
],
"pandas_version": "1.4.0",
},
"data": [
{
"Transaction_date": "2009-01-02T06:17:00.000Z",
"Product": "Product1",
"Price": 1200,
},
{
"Transaction_date": "2009-01-02T04:53:00.000Z",
"Product": "Product1",
"Price": 1200,
},
{
"Transaction_date": "2009-01-02T13:08:00.000Z",
"Product": "Product1",
"Price": 1200,
},
{
"Transaction_date": "2009-01-03T14:44:00.000Z",
"Product": "Product1",
"Price": 1200,
},
{
"Transaction_date": "2009-01-02T20:09:00.000Z",
"Product": "Product1",
"Price": 1200,
},
],
}
),
orient="table",
).reset_index(drop=True)


def test_date_filter(date_df: DataFrame, expected_date_filter_result: DataFrame):
# Datetimes
step = FilterStep(
condition=ConditionComboAnd(
and_=[
DateBoundCondition(
column="Transaction_date",
operator="from",
value=datetime(2009, 1, 2, tzinfo=ZoneInfo("UTC")), # tz-aware
),
DateBoundCondition(
column="Transaction_date",
operator="until",
value=datetime(2009, 1, 3), # naive
),
]
)
)

result = execute_filter(step=step, df=date_df).reset_index(drop=True)
assert_frame_equal(expected_date_filter_result, result)

# trying with string dates now
step = FilterStep(
condition=ConditionComboAnd(
and_=[
DateBoundCondition(
column="Transaction_date", operator="from", value="2009-01-02T00:00:00" # naive
),
DateBoundCondition(
column="Transaction_date",
operator="until",
value="2009-01-03T00:00:00+00:00", # tz-aware
),
]
)
)

result = execute_filter(step=step, df=date_df).reset_index(drop=True)
assert_frame_equal(expected_date_filter_result, result)

0 comments on commit eaef42d

Please sign in to comment.