Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ee/clickhouse/queries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
from .clickhouse_retention import ClickhouseRetention
from .clickhouse_session_recording import SessionRecording
from .clickhouse_stickiness import ClickhouseStickiness
from .paths.paths import ClickhousePaths
from .paths import ClickhousePaths
from .trends.clickhouse_trends import ClickhouseTrends
2 changes: 2 additions & 0 deletions ee/clickhouse/queries/paths/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .paths import ClickhousePaths
from .paths_persons import ClickhousePathsPersons
43 changes: 34 additions & 9 deletions ee/clickhouse/queries/paths/paths.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Literal, Optional, Tuple, Union
from typing import Dict, List, Literal, Optional, Tuple, Union, cast

from rest_framework.exceptions import ValidationError

Expand Down Expand Up @@ -57,12 +57,15 @@ def _exec_query(self) -> List[Tuple]:

def get_query(self) -> str:

if self._filter.funnel_paths and self._funnel_filter:
return self.get_path_query_by_funnel(funnel_filter=self._funnel_filter)
else:
return self.get_path_query()
path_query = self.get_path_query()
funnel_cte = ""

def get_path_query(self) -> str:
if self.should_query_funnel():
funnel_cte = self.get_path_query_funnel_cte(cast(Filter, self._funnel_filter))

return funnel_cte + path_query

def get_paths_per_person_query(self) -> str:
path_event_query, params = PathEventQuery(filter=self._filter, team_id=self._team.pk).get_query()
self.params.update(params)

Expand All @@ -74,8 +77,31 @@ def get_path_query(self) -> str:
path_event_query=path_event_query, boundary_event_filter=boundary_event_filter, target_clause=target_clause
)

def get_path_query_by_funnel(self, funnel_filter: Filter):
path_query = self.get_path_query()
def should_query_funnel(self) -> bool:
if self._filter.funnel_paths and self._funnel_filter:
return True
return False

def get_path_query(self) -> str:

paths_per_person_query = self.get_paths_per_person_query()

return f"""
SELECT last_path_key as source_event,
path_key as target_event,
COUNT(*) AS event_count,
avg(conversion_time) AS average_conversion_time
FROM ({paths_per_person_query})
WHERE source_event IS NOT NULL
GROUP BY source_event,
target_event
ORDER BY event_count DESC,
source_event,
target_event
LIMIT 30
"""

def get_path_query_funnel_cte(self, funnel_filter: Filter):
funnel_persons_generator = ClickhouseFunnelPersons(
funnel_filter,
self._team,
Expand All @@ -91,7 +117,6 @@ def get_path_query_by_funnel(self, funnel_filter: Filter):
WITH {PathEventQuery.FUNNEL_PERSONS_ALIAS} AS (
{funnel_persons_query_new_params}
)
{path_query}
"""

def get_target_point_filter(self) -> str:
Expand Down
51 changes: 51 additions & 0 deletions ee/clickhouse/queries/paths/paths_persons.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from typing import cast

from ee.clickhouse.queries.paths.paths import ClickhousePaths
from ee.clickhouse.sql.funnels.funnel import FUNNEL_PERSONS_BY_STEP_SQL
from posthog.models import Person
from posthog.models.filters.filter import Filter


class ClickhousePathsPersons(ClickhousePaths):
def get_query(self):

paths_per_person_query = self.get_paths_per_person_query()
person_path_filter = self.get_person_path_filter()
paths_funnel_cte = ""

if self.should_query_funnel():
paths_funnel_cte = self.get_path_query_funnel_cte(cast(Filter, self._funnel_filter))

self.params["limit"] = self._filter.limit or 100
self.params["offset"] = self._filter.offset

return f"""
{paths_funnel_cte}
SELECT person_id
FROM (
{paths_per_person_query}
)
WHERE {person_path_filter}
ORDER BY person_id
LIMIT %(limit)s
OFFSET %(offset)s
"""

def get_person_path_filter(self) -> str:
conditions = []
if self._filter.path_start_key:
conditions.append("last_path_key = %(path_start_key)s")
self.params["path_start_key"] = self._filter.path_start_key

if self._filter.path_end_key:
conditions.append("path_key = %(path_end_key)s")
self.params["path_end_key"] = self._filter.path_end_key

return " AND ".join(conditions)

def _format_results(self, results):
people = Person.objects.filter(team_id=self._team.pk, uuid__in=[val[0] for val in results])

from posthog.api.person import PersonSerializer

return PersonSerializer(people, many=True).data, len(results) > cast(int, self._filter.limit) - 1
75 changes: 56 additions & 19 deletions ee/clickhouse/queries/test/test_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ee.clickhouse.materialized_columns.columns import materialize
from ee.clickhouse.models.event import create_event
from ee.clickhouse.queries import ClickhousePaths
from ee.clickhouse.queries.paths import ClickhousePathsPersons
from ee.clickhouse.queries.paths.path_event_query import PathEventQuery
from ee.clickhouse.util import ClickhouseTestMixin
from posthog.constants import (
Expand All @@ -31,6 +32,11 @@ def _create_event(**kwargs):


class TestClickhousePaths(ClickhouseTestMixin, paths_test_factory(ClickhousePaths, _create_event, Person.objects.create)): # type: ignore
def _get_people_at_path(self, filter, path_start, path_end, funnel_filter=None):
person_filter = filter.with_data({"path_start_key": path_start, "path_end_key": path_end})
result = ClickhousePathsPersons(person_filter, self.team, funnel_filter)._exec_query()
return [row[0] for row in result]

def test_denormalized_properties(self):
materialize("events", "$current_url")
materialize("events", "$screen_name")
Expand All @@ -46,7 +52,7 @@ def test_denormalized_properties(self):
def test_step_limit(self):

with freeze_time("2012-01-01T03:21:34.000Z"):
Person.objects.create(team_id=self.team.pk, distinct_ids=["fake"])
p1 = Person.objects.create(team_id=self.team.pk, distinct_ids=["fake"])
_create_event(
properties={"$current_url": "/1"}, distinct_id="fake", event="$pageview", team=self.team,
)
Expand All @@ -67,34 +73,40 @@ def test_step_limit(self):
filter = PathFilter(data={"step_limit": 2})
response = ClickhousePaths(team=self.team, filter=filter).run(team=self.team, filter=filter)

self.assertEqual(
response, [{"source": "1_/1", "target": "2_/2", "value": 1, "average_conversion_time": ONE_MINUTE}]
)
self.assertEqual(
response, [{"source": "1_/1", "target": "2_/2", "value": 1, "average_conversion_time": ONE_MINUTE}]
)
self.assertEqual([p1.uuid], self._get_people_at_path(filter, "1_/1", "2_/2"))
self.assertEqual([], self._get_people_at_path(filter, "2_/2", "3_/3"))

with freeze_time("2012-01-7T03:21:34.000Z"):
filter = PathFilter(data={"step_limit": 3})
response = ClickhousePaths(team=self.team, filter=filter).run(team=self.team, filter=filter)

self.assertEqual(
response,
[
{"source": "1_/1", "target": "2_/2", "value": 1, "average_conversion_time": ONE_MINUTE},
{"source": "2_/2", "target": "3_/3", "value": 1, "average_conversion_time": 2 * ONE_MINUTE},
],
)
self.assertEqual(
response,
[
{"source": "1_/1", "target": "2_/2", "value": 1, "average_conversion_time": ONE_MINUTE},
{"source": "2_/2", "target": "3_/3", "value": 1, "average_conversion_time": 2 * ONE_MINUTE},
],
)
self.assertEqual([p1.uuid], self._get_people_at_path(filter, "2_/2", "3_/3"))

with freeze_time("2012-01-7T03:21:34.000Z"):
filter = PathFilter(data={"step_limit": 4})
response = ClickhousePaths(team=self.team, filter=filter).run(team=self.team, filter=filter)

self.assertEqual(
response,
[
{"source": "1_/1", "target": "2_/2", "value": 1, "average_conversion_time": ONE_MINUTE},
{"source": "2_/2", "target": "3_/3", "value": 1, "average_conversion_time": 2 * ONE_MINUTE},
{"source": "3_/3", "target": "4_/4", "value": 1, "average_conversion_time": 3 * ONE_MINUTE},
],
)
self.assertEqual(
response,
[
{"source": "1_/1", "target": "2_/2", "value": 1, "average_conversion_time": ONE_MINUTE},
{"source": "2_/2", "target": "3_/3", "value": 1, "average_conversion_time": 2 * ONE_MINUTE},
{"source": "3_/3", "target": "4_/4", "value": 1, "average_conversion_time": 3 * ONE_MINUTE},
],
)
self.assertEqual([p1.uuid], self._get_people_at_path(filter, "1_/1", "2_/2"))
self.assertEqual([p1.uuid], self._get_people_at_path(filter, "2_/2", "3_/3"))
self.assertEqual([p1.uuid], self._get_people_at_path(filter, "3_/3", "4_/4"))

def test_step_conversion_times(self):

Expand Down Expand Up @@ -274,6 +286,16 @@ def test_path_by_funnel_after_dropoff(self):
},
],
)
self.assertEqual(20, len(self._get_people_at_path(path_filter, "1_step one", "2_step dropoff1", funnel_filter)))
self.assertEqual(
20, len(self._get_people_at_path(path_filter, "2_step dropoff1", "3_step dropoff2", funnel_filter))
)
self.assertEqual(
10, len(self._get_people_at_path(path_filter, "3_step dropoff2", "4_step branch", funnel_filter))
)
self.assertEqual(
0, len(self._get_people_at_path(path_filter, "4_step branch", "3_step dropoff2", funnel_filter))
)

def test_path_by_funnel_after_step(self):
self._create_sample_data_multiple_dropoffs()
Expand Down Expand Up @@ -475,6 +497,21 @@ def test_path_by_funnel_between_step(self):
},
],
)
self.assertEqual(
15, len(self._get_people_at_path(path_filter, "1_step one", "2_between_step_1_a", funnel_filter))
)
self.assertEqual(
15, len(self._get_people_at_path(path_filter, "2_between_step_1_a", "3_between_step_1_b", funnel_filter))
)
self.assertEqual(
10, len(self._get_people_at_path(path_filter, "3_between_step_1_b", "4_step two", funnel_filter))
)
self.assertEqual(
5, len(self._get_people_at_path(path_filter, "3_between_step_1_b", "4_between_step_1_c", funnel_filter))
)
self.assertEqual(
5, len(self._get_people_at_path(path_filter, "4_between_step_1_c", "5_step two", funnel_filter))
)

@test_with_materialized_columns(["$current_url"])
def test_paths_end(self):
Expand Down
Loading