Skip to content

Commit

Permalink
feat(correlation): Enable person-on-events querying (#9952)
Browse files Browse the repository at this point in the history
  • Loading branch information
neilkakkar authored May 26, 2022
1 parent ff66ccf commit 52252e0
Show file tree
Hide file tree
Showing 8 changed files with 308 additions and 261 deletions.
32 changes: 30 additions & 2 deletions ee/clickhouse/queries/funnels/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class ClickhouseFunnelBase(ABC):
_include_preceding_timestamp: Optional[bool]
_extra_event_fields: List[ColumnName]
_extra_event_properties: List[PropertyName]
_include_person_properties: Optional[bool]
_include_group_properties: List[int]

def __init__(
self,
Expand All @@ -45,6 +47,8 @@ def __init__(
include_timestamp: Optional[bool] = None,
include_preceding_timestamp: Optional[bool] = None,
base_uri: str = "/",
include_person_properties: Optional[bool] = None,
include_group_properties: Optional[List[int]] = None, # group_type_index for respective group type to get
) -> None:
self._filter = filter
self._team = team
Expand All @@ -56,6 +60,8 @@ def __init__(
}
self._include_timestamp = include_timestamp
self._include_preceding_timestamp = include_preceding_timestamp
self._include_person_properties = include_person_properties
self._include_group_properties = include_group_properties or []

# handle default if window isn't provided
if not self._filter.funnel_window_days and not self._filter.funnel_window_interval:
Expand Down Expand Up @@ -352,11 +358,20 @@ def _get_sorting_condition(self, curr_index: int, max_steps: int):
return f"if({' AND '.join(conditions)}, {curr_index}, {self._get_sorting_condition(curr_index - 1, max_steps)})"

def _get_inner_event_query(
self, entities=None, entity_name="events", skip_entity_filter=False, skip_step_filter=False, extra_fields=[]
self, entities=None, entity_name="events", skip_entity_filter=False, skip_step_filter=False,
) -> str:
parsed_extra_fields = f", {', '.join(extra_fields)}" if extra_fields else ""
entities_to_use = entities or self._filter.entities

extra_fields = []
if self._team.actor_on_events_querying_enabled:
if self._include_person_properties:
extra_fields.append("person_properties")

for group_index in self._include_group_properties:
extra_fields.append(f"group{group_index}_properties")

parsed_extra_fields = f", {', '.join(extra_fields)}" if extra_fields else ""

event_query, params = FunnelEventQuery(
filter=self._filter,
team=self._team,
Expand Down Expand Up @@ -694,3 +709,16 @@ def _get_breakdown_prop(self, group_remaining=False) -> str:
return ", prop"
else:
return ""

def _get_person_and_group_properties(self, aggregate: bool = False) -> str:
fields = []
if self._team.actor_on_events_querying_enabled:
if self._include_person_properties:
fields.append("any(person_properties) as person_properties" if aggregate else "person_properties")

for group_index in self._include_group_properties:
group_label = f"group{group_index}_properties"
fields.append(f"any({group_label}) as {group_label}" if aggregate else group_label)

parsed_fields = f", {', '.join(fields)}" if fields else ""
return parsed_fields
17 changes: 8 additions & 9 deletions ee/clickhouse/queries/funnels/funnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ def get_step_counts_query(self):
inner_timestamps, outer_timestamps = self._get_timestamp_selects()

return f"""
SELECT aggregation_target, steps {self._get_step_time_avgs(max_steps, inner_query=True)} {self._get_step_time_median(max_steps, inner_query=True)} {self._get_matching_event_arrays(max_steps)} {breakdown_clause} {outer_timestamps} FROM (
SELECT aggregation_target, steps, max(steps) over (PARTITION BY aggregation_target {breakdown_clause}) as max_steps {self._get_step_time_names(max_steps)} {self._get_matching_events(max_steps)} {breakdown_clause} {inner_timestamps} FROM (
SELECT aggregation_target, steps {self._get_step_time_avgs(max_steps, inner_query=True)} {self._get_step_time_median(max_steps, inner_query=True)} {self._get_matching_event_arrays(max_steps)} {breakdown_clause} {outer_timestamps} {self._get_person_and_group_properties(aggregate=True)} FROM (
SELECT aggregation_target, steps, max(steps) over (PARTITION BY aggregation_target {breakdown_clause}) as max_steps {self._get_step_time_names(max_steps)} {self._get_matching_events(max_steps)} {breakdown_clause} {inner_timestamps} {self._get_person_and_group_properties()} FROM (
{steps_per_person_query}
)
) GROUP BY aggregation_target, steps {breakdown_clause}
Expand Down Expand Up @@ -142,7 +142,7 @@ def get_step_counts_without_aggregation_query(self):
exclusion_clause = self._get_exclusion_condition()

return f"""
SELECT *, {self._get_sorting_condition(max_steps, max_steps)} AS steps {exclusion_clause} {self._get_step_times(max_steps)}{self._get_matching_events(max_steps)} {breakdown_query} FROM (
SELECT *, {self._get_sorting_condition(max_steps, max_steps)} AS steps {exclusion_clause} {self._get_step_times(max_steps)}{self._get_matching_events(max_steps)} {breakdown_query} {self._get_person_and_group_properties()} FROM (
{formatted_query}
) WHERE step_0 = 1
{'AND exclusion = 0' if exclusion_clause else ''}
Expand Down Expand Up @@ -187,9 +187,8 @@ def get_comparison_cols(self, level_index: int, max_steps: int):
return ", ".join(cols)

def build_step_subquery(
self, level_index: int, max_steps: int, event_names_alias: str = "events", extra_fields: List[str] = []
self, level_index: int, max_steps: int, event_names_alias: str = "events",
):
parsed_extra_fields = f", {', '.join(extra_fields)}" if extra_fields else ""

if level_index >= max_steps:
return f"""
Expand All @@ -198,8 +197,8 @@ def build_step_subquery(
timestamp,
{self._get_partition_cols(1, max_steps)}
{self._get_breakdown_prop(group_remaining=True)}
{parsed_extra_fields}
FROM ({self._get_inner_event_query(entity_name=event_names_alias, extra_fields=extra_fields)})
{self._get_person_and_group_properties()}
FROM ({self._get_inner_event_query(entity_name=event_names_alias)})
"""
else:
return f"""
Expand All @@ -208,14 +207,14 @@ def build_step_subquery(
timestamp,
{self._get_partition_cols(level_index, max_steps)}
{self._get_breakdown_prop()}
{parsed_extra_fields}
{self._get_person_and_group_properties()}
FROM (
SELECT
aggregation_target,
timestamp,
{self.get_comparison_cols(level_index, max_steps)}
{self._get_breakdown_prop()}
{parsed_extra_fields}
{self._get_person_and_group_properties()}
FROM ({self.build_step_subquery(level_index + 1, max_steps)})
)
"""
103 changes: 77 additions & 26 deletions ee/clickhouse/queries/funnels/funnel_correlation.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,23 @@ def __init__(
)
filter = Filter(data=filter_data)

self.query_person_properties = False
self.query_group_properties = False
if (
self._team.actor_on_events_querying_enabled
and self._filter.correlation_type == FunnelCorrelationType.PROPERTIES
):
# When dealing with properties, make sure funnel response comes with properties
# so we don't have to join on persons/groups to get these properties again
if filter.aggregation_group_type_index is not None:
self.query_group_properties = True
else:
self.query_person_properties = True

self.include_funnel_group_properties: list = [
filter.aggregation_group_type_index
] if self.query_group_properties else []

funnel_order_actor_class = get_funnel_order_actor_class(filter)

self._funnel_actors_generator = funnel_order_actor_class(
Expand All @@ -142,6 +159,8 @@ def __init__(
# NOTE: we don't need these as we have all the information we need to
# deduce if the person was successful or not
include_preceding_timestamp=False,
include_person_properties=self.query_person_properties,
include_group_properties=self.include_funnel_group_properties,
)

def support_autocapture_elements(self) -> bool:
Expand Down Expand Up @@ -384,21 +403,26 @@ def get_properties_query(self) -> Tuple[str, Dict[str, Any]]:
return query, params

def _get_aggregation_target_join_query(self) -> str:
aggregation_person_join = f"""
JOIN ({get_team_distinct_ids_query(self._team.pk)}) AS pdi
ON pdi.distinct_id = events.distinct_id
-- NOTE: I would love to right join here, so we count get total
-- success/failure numbers in one pass, but this causes out of memory
-- error mentioning issues with right filling. I'm sure there's a way
-- to do it but lifes too short.
JOIN funnel_actors AS actors
ON pdi.person_id = actors.actor_id

if self._team.actor_on_events_querying_enabled:
aggregation_person_join = f"""
JOIN funnel_actors as actors
ON event.person_id = actors.actor_id
"""

# :KLUDGE: aggregation_target is called person_id in funnel people CTEs.
# Since supporting that properly involves updating everything that uses the CTE to rename person_id,
# keeping it as-is for now
else:
aggregation_person_join = f"""
JOIN ({get_team_distinct_ids_query(self._team.pk)}) AS pdi
ON pdi.distinct_id = events.distinct_id
-- NOTE: I would love to right join here, so we count get total
-- success/failure numbers in one pass, but this causes out of memory
-- error mentioning issues with right filling. I'm sure there's a way
-- to do it but lifes too short.
JOIN funnel_actors AS actors
ON pdi.person_id = actors.actor_id
"""

aggregation_group_join = f"""
JOIN funnel_actors AS actors
ON actors.actor_id = events.$group_{self._filter.aggregation_group_type_index}
Expand Down Expand Up @@ -446,6 +470,9 @@ def _get_events_join_query(self) -> str:
"""

def _get_aggregation_join_query(self):
if self._team.actor_on_events_querying_enabled:
return "", {}

if self._filter.aggregation_group_type_index is None:
person_query, person_query_params = PersonQuery(
self._filter, self._team.pk, EnterpriseColumnOptimizer(self._filter, self._team.pk)
Expand All @@ -463,12 +490,19 @@ def _get_aggregation_join_query(self):

def _get_properties_prop_clause(self):

group_properties_field = f"groups_{self._filter.aggregation_group_type_index}.group_properties_{self._filter.aggregation_group_type_index}"
aggregation_properties_alias = (
PersonQuery.PERSON_PROPERTIES_ALIAS
if self._filter.aggregation_group_type_index is None
else group_properties_field
)
if self._team.actor_on_events_querying_enabled:
group_properties_field = f"group{self._filter.aggregation_group_type_index}_properties"
aggregation_properties_alias = (
"person_properties" if self._filter.aggregation_group_type_index is None else group_properties_field
)

else:
group_properties_field = f"groups_{self._filter.aggregation_group_type_index}.group_properties_{self._filter.aggregation_group_type_index}"
aggregation_properties_alias = (
PersonQuery.PERSON_PROPERTIES_ALIAS
if self._filter.aggregation_group_type_index is None
else group_properties_field
)

if "$all" in cast(list, self._filter.correlation_property_names):
map_expr = trim_quotes_expr(f"JSONExtractRaw({aggregation_properties_alias}, x)")
Expand All @@ -491,11 +525,17 @@ def _get_properties_prop_clause(self):
param_name = f"property_name_{index}"
if self._filter.aggregation_group_type_index is not None:
expression, _ = get_property_string_expr(
"groups", property_name, f"%({param_name})s", group_properties_field
"groups" if not self._team.actor_on_events_querying_enabled else "events",
property_name,
f"%({param_name})s",
aggregation_properties_alias,
)
else:
expression, _ = get_property_string_expr(
"person", property_name, f"%({param_name})s", PersonQuery.PERSON_PROPERTIES_ALIAS,
"person" if not self._team.actor_on_events_querying_enabled else "events",
property_name,
f"%({param_name})s",
aggregation_properties_alias,
)
person_property_params[param_name] = property_name
person_property_expressions.append(expression)
Expand Down Expand Up @@ -696,11 +736,18 @@ def construct_person_properties_people_url(self, success: bool, event_definition
# persons endpoint, with the breakdown value set, and we assume that
# event.event will be of the format "{property_name}::{property_value}"
property_name, property_value = event_definition["event"].split("::")
prop_type = "group" if self._filter.aggregation_group_type_index else "person"
params = self._filter.with_data(
{
"funnel_correlation_person_converted": "true" if success else "false",
"funnel_correlation_property_values": [
{"key": property_name, "value": property_value, "type": "person", "operator": "exact"}
{
"key": property_name,
"value": property_value,
"type": prop_type,
"operator": "exact",
"group_type_index": self._filter.aggregation_group_type_index,
}
],
}
).to_params()
Expand Down Expand Up @@ -755,10 +802,14 @@ def get_partial_event_contingency_tables(self) -> Tuple[List[EventContingencyTab
)

def get_funnel_actors_cte(self) -> Tuple[str, Dict[str, Any]]:

return self._funnel_actors_generator.actor_query(
limit_actors=False, extra_fields=["steps", "final_timestamp", "first_timestamp"]
)
extra_fields = ["steps", "final_timestamp", "first_timestamp"]
if self.query_person_properties:
extra_fields.append("person_properties")
if self.query_group_properties:
for group_index in self.include_funnel_group_properties:
extra_fields.append(f"group{group_index}_properties")

return self._funnel_actors_generator.actor_query(limit_actors=False, extra_fields=extra_fields)

@staticmethod
def are_results_insignificant(event_contingency_table: EventContingencyTable) -> bool:
Expand Down
Loading

0 comments on commit 52252e0

Please sign in to comment.