Skip to content

Commit

Permalink
feat(retention): Enable person on events querying for retention (#9858)
Browse files Browse the repository at this point in the history
* add constance setting

* add minimum viable code

* update journeys_for

* working test case

* chore: remove eventserializer in unnecessary places

* change execution

* change imports

* fix typing and tests

* change format

* adjust

* reset

* split group path in trend breakdown

* fix: typing

* fix test

* fix test, rename

* automatically add persons to events in tests on flush

* test CI with person-on-events

* wip

* fix

* fix ci

* add comma

* reduce test explosion

* fix CH server image version #9743

* update conditional

* split person on events

* split person on events

* split properly

* fix test deps

* add persons on events for tests

* rm eventserializer changes

* upd

* fix tests

* streamline person on event tests

* remove cache dependence, even after dematerialization deletion

* disregard snapshot tests

* no casting for matrix variables

* fix actions problems

* try to limit blast radius

* Update snapshots

* why are these tests passing?

* stoopid

* fix trend tests

* fix more trend tests, add test for old query regression

* Update snapshots

* optimise persons

* feat(retention): Enable person on events querying for retention

* update snaps

* Update snapshots

* Update snapshots

* fix tests

Co-authored-by: eric <eeoneric@gmail.com>
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored May 23, 2022
1 parent 351b311 commit 5e09eba
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 20 deletions.
12 changes: 10 additions & 2 deletions ee/clickhouse/queries/retention/clickhouse_retention.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,17 @@ def actors_in_period(self, filter: RetentionFilter, team: Team):


def build_returning_event_query(
filter: RetentionFilter, team: Team, aggregate_users_by_distinct_id: Optional[bool] = None
filter: RetentionFilter,
team: Team,
aggregate_users_by_distinct_id: Optional[bool] = None,
using_person_on_events: bool = False,
):
returning_event_query_templated, returning_event_params = RetentionEventsQuery(
filter=filter.with_data({"breakdowns": []}), # Avoid pulling in breakdown values from returning event query
team=team,
event_query_type=RetentionQueryType.RETURNING,
aggregate_users_by_distinct_id=aggregate_users_by_distinct_id,
using_person_on_events=using_person_on_events,
).get_query()

query = substitute_params(returning_event_query_templated, returning_event_params)
Expand All @@ -157,7 +161,10 @@ def build_returning_event_query(


def build_target_event_query(
filter: RetentionFilter, team: Team, aggregate_users_by_distinct_id: Optional[bool] = None
filter: RetentionFilter,
team: Team,
aggregate_users_by_distinct_id: Optional[bool] = None,
using_person_on_events: bool = False,
):
target_event_query_templated, target_event_params = RetentionEventsQuery(
filter=filter,
Expand All @@ -168,6 +175,7 @@ def build_target_event_query(
else RetentionQueryType.TARGET
),
aggregate_users_by_distinct_id=aggregate_users_by_distinct_id,
using_person_on_events=using_person_on_events,
).get_query()

query = substitute_params(target_event_query_templated, target_event_params)
Expand Down
10 changes: 8 additions & 2 deletions ee/clickhouse/queries/retention/retention_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,17 @@ def build_actor_activity_query(
person_ids
"""
returning_event_query = build_returning_event_query(
filter=filter, team=team, aggregate_users_by_distinct_id=aggregate_users_by_distinct_id
filter=filter,
team=team,
aggregate_users_by_distinct_id=aggregate_users_by_distinct_id,
using_person_on_events=team.actor_on_events_querying_enabled,
)

target_event_query = build_target_event_query(
filter=filter, team=team, aggregate_users_by_distinct_id=aggregate_users_by_distinct_id
filter=filter,
team=team,
aggregate_users_by_distinct_id=aggregate_users_by_distinct_id,
using_person_on_events=team.actor_on_events_querying_enabled,
)

all_params = {
Expand Down
34 changes: 28 additions & 6 deletions ee/clickhouse/queries/retention/retention_event_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from posthog.models.action.util import Action, format_action_filter
from posthog.models.filters.retention_filter import RetentionFilter
from posthog.models.team import Team
from posthog.models.utils import PersonPropertiesMode
from posthog.queries.util import format_ch_timestamp, get_trunc_func_ch


Expand All @@ -28,10 +29,14 @@ def __init__(
event_query_type: RetentionQueryType,
team: Team,
aggregate_users_by_distinct_id: Optional[bool] = None,
using_person_on_events: bool = False,
):
self._event_query_type = event_query_type
super().__init__(
filter=filter, team=team, override_aggregate_users_by_distinct_id=aggregate_users_by_distinct_id
filter=filter,
team=team,
override_aggregate_users_by_distinct_id=aggregate_users_by_distinct_id,
using_person_on_events=using_person_on_events,
)

self._trunc_func = get_trunc_func_ch(self._filter.period)
Expand Down Expand Up @@ -60,7 +65,7 @@ def get_query(self) -> Tuple[str, Dict[str, Any]]:
get_aggregation_target_field(
self._filter.aggregation_group_type_index,
self.EVENT_TABLE_ALIAS,
f"{self.DISTINCT_ID_TABLE_ALIAS}.person_id",
f"{self.DISTINCT_ID_TABLE_ALIAS if not self._using_person_on_events else self.EVENT_TABLE_ALIAS}.person_id",
)
)
]
Expand All @@ -77,8 +82,8 @@ def get_query(self) -> Tuple[str, Dict[str, Any]]:
column = "properties"

if breakdown_type == "person":
table = "person"
column = "person_props"
table = "person" if not self._using_person_on_events else "events"
column = "person_props" if not self._using_person_on_events else "person_properties"

breakdown_values_expression = get_single_or_multi_property_string_expr(
breakdown=[breakdown["property"] for breakdown in self._filter.breakdowns],
Expand Down Expand Up @@ -131,7 +136,12 @@ def get_query(self) -> Tuple[str, Dict[str, Any]]:
date_query, date_params = self._get_date_filter()
self.params.update(date_params)

prop_query, prop_params = self._get_prop_groups(self._filter.property_groups)
prop_query, prop_params = self._get_prop_groups(
self._filter.property_groups,
person_properties_mode=PersonPropertiesMode.DIRECT_ON_EVENTS
if self._using_person_on_events
else PersonPropertiesMode.USING_PERSON_PROPERTIES_COLUMN,
)

self.params.update(prop_params)

Expand Down Expand Up @@ -179,12 +189,24 @@ def _determine_should_join_distinct_ids(self) -> None:
else:
self._should_join_distinct_ids = True

def _determine_should_join_persons(self) -> None:
EnterpriseEventQuery._determine_should_join_persons(self)
if self._using_person_on_events:
self._should_join_distinct_ids = False
self._should_join_persons = False

def _get_entity_query(self, entity: Entity):
prepend = self._event_query_type
if entity.type == TREND_FILTER_TYPE_ACTIONS:
action = Action.objects.get(pk=entity.id)
action_query, params = format_action_filter(
team_id=self._team_id, action=action, prepend=prepend, use_loop=False
team_id=self._team_id,
action=action,
prepend=prepend,
use_loop=False,
person_properties_mode=PersonPropertiesMode.DIRECT_ON_EVENTS
if self._using_person_on_events
else PersonPropertiesMode.USING_PERSON_PROPERTIES_COLUMN,
)
condition = action_query
elif entity.type == TREND_FILTER_TYPE_EVENTS:
Expand Down
20 changes: 12 additions & 8 deletions ee/clickhouse/test/test_journeys.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ee.clickhouse.sql.events import EVENTS_DATA_TABLE
from posthog.client import sync_execute
from posthog.models import Person, PersonDistinctId, Team
from posthog.test.base import flush_persons_and_events
from posthog.test.base import _create_event, flush_persons_and_events


def journeys_for(
Expand Down Expand Up @@ -37,6 +37,9 @@ def journeys_for(
And clarifies the preconditions of the test
"""

def _create_event_from_args(**event):
return {**event}

flush_persons_and_events()
people = {}
events_to_create = []
Expand All @@ -53,7 +56,7 @@ def journeys_for(
event["timestamp"] = datetime.now()

events_to_create.append(
_create_event(
_create_event_from_args(
team=team,
distinct_id=distinct_id,
event=event["event"],
Expand All @@ -69,12 +72,12 @@ def journeys_for(
)
)

_create_all_events(events_to_create)
_create_all_events_raw(events_to_create)

return people


def _create_all_events(all_events: List[Dict]):
def _create_all_events_raw(all_events: List[Dict]):
parsed = ""
for event in all_events:
data: Dict[str, Any] = {
Expand Down Expand Up @@ -102,6 +105,11 @@ def _create_all_events(all_events: List[Dict]):
)


def create_all_events(all_events: List[dict]):
for event in all_events:
_create_event(**event)


# We collect all events per test into an array and batch create the events to reduce creation time
@dataclasses.dataclass
class InMemoryEvent:
Expand All @@ -119,10 +127,6 @@ class InMemoryEvent:
group4_properties: Dict


def _create_event(**event):
return {**event}


def update_or_create_person(distinct_ids: List[str], team_id: int, **kwargs):
(person, _) = Person.objects.update_or_create(
persondistinctid__distinct_id__in=distinct_ids,
Expand Down
4 changes: 2 additions & 2 deletions ee/clickhouse/views/test/test_clickhouse_retention.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from django.test import TestCase
from django.test.client import Client

from ee.clickhouse.test.test_journeys import _create_all_events, update_or_create_person
from ee.clickhouse.test.test_journeys import create_all_events, update_or_create_person
from ee.clickhouse.util import ClickhouseTestMixin, snapshot_clickhouse_queries
from ee.clickhouse.views.test.funnel.util import EventPattern
from posthog.api.test.test_organization import create_organization
Expand Down Expand Up @@ -469,7 +469,7 @@ def test_can_get_actors_and_use_percent_char_filter(self):


def setup_user_activity_by_day(daily_activity, team):
_create_all_events(
create_all_events(
[
{"distinct_id": person_id, "team": team, "timestamp": timestamp, **event}
for timestamp, people in daily_activity.items()
Expand Down

0 comments on commit 5e09eba

Please sign in to comment.