Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(correlation): Enable person-on-events querying #9952

Merged
merged 6 commits into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
wip
  • Loading branch information
neilkakkar committed May 24, 2022
commit 48b357b8378142f3633976ca56080d1b64adf120
44 changes: 42 additions & 2 deletions ee/clickhouse/queries/funnels/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class ClickhouseFunnelBase(ABC):
_include_preceding_timestamp: Optional[bool]
_extra_event_fields: List[ColumnName]
_extra_event_properties: List[PropertyName]
_include_person_properties: Optional[bool]
_include_group_properties: List[int]

def __init__(
self,
Expand All @@ -45,6 +47,8 @@ def __init__(
include_timestamp: Optional[bool] = None,
include_preceding_timestamp: Optional[bool] = None,
base_uri: str = "/",
include_person_properties: Optional[bool] = None,
include_group_properties: Optional[List[int]] = None, # group_type_index for respective group type to get
) -> None:
self._filter = filter
self._team = team
Expand All @@ -56,6 +60,8 @@ def __init__(
}
self._include_timestamp = include_timestamp
self._include_preceding_timestamp = include_preceding_timestamp
self._include_person_properties = include_person_properties
self._include_group_properties = include_group_properties or []

# handle default if window isn't provided
if not self._filter.funnel_window_days and not self._filter.funnel_window_interval:
Expand Down Expand Up @@ -352,11 +358,20 @@ def _get_sorting_condition(self, curr_index: int, max_steps: int):
return f"if({' AND '.join(conditions)}, {curr_index}, {self._get_sorting_condition(curr_index - 1, max_steps)})"

def _get_inner_event_query(
self, entities=None, entity_name="events", skip_entity_filter=False, skip_step_filter=False, extra_fields=[]
self, entities=None, entity_name="events", skip_entity_filter=False, skip_step_filter=False,
) -> str:
parsed_extra_fields = f", {', '.join(extra_fields)}" if extra_fields else ""
entities_to_use = entities or self._filter.entities

extra_fields = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Can reuse _get_person_and_group_properties here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra_fields is reused below, hence didn't 😅

if self._team.actor_on_events_querying_enabled:
if self._include_person_properties:
extra_fields.append("person_properties")

for group_index in self._include_group_properties:
extra_fields.append(f"group{group_index}_properties")

parsed_extra_fields = f", {', '.join(extra_fields)}" if extra_fields else ""

event_query, params = FunnelEventQuery(
filter=self._filter,
team=self._team,
Expand Down Expand Up @@ -660,3 +675,28 @@ def _get_breakdown_prop(self, group_remaining=False) -> str:
return ", prop"
else:
return ""

def _get_person_and_group_properties(self) -> str:
fields = []
if self._team.actor_on_events_querying_enabled:
if self._include_person_properties:
fields.append("person_properties")

for group_index in self._include_group_properties:
fields.append(f"group{group_index}_properties")

parsed_fields = f", {', '.join(fields)}" if fields else ""
return parsed_fields

def _get_person_and_group_properties_aggregate(self) -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Can consider just making _get_person_and_group_properties an "aggregate" bool field that determines how the select field should look.

** These two NITS were mainly readability. I was confused at first why it was repeating

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, done!

fields = []
if self._team.actor_on_events_querying_enabled:
if self._include_person_properties:
fields.append("any(person_properties) as person_properties")

for group_index in self._include_group_properties:
group_label = f"group{group_index}_properties"
fields.append(f"any({group_label}) as {group_label}")

parsed_fields = f", {', '.join(fields)}" if fields else ""
return parsed_fields
17 changes: 8 additions & 9 deletions ee/clickhouse/queries/funnels/funnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ def get_step_counts_query(self):
inner_timestamps, outer_timestamps = self._get_timestamp_selects()

return f"""
SELECT aggregation_target, steps {self._get_step_time_avgs(max_steps, inner_query=True)} {self._get_step_time_median(max_steps, inner_query=True)} {self._get_matching_event_arrays(max_steps)} {breakdown_clause} {outer_timestamps} FROM (
SELECT aggregation_target, steps, max(steps) over (PARTITION BY aggregation_target {breakdown_clause}) as max_steps {self._get_step_time_names(max_steps)} {self._get_matching_events(max_steps)} {breakdown_clause} {inner_timestamps} FROM (
SELECT aggregation_target, steps {self._get_step_time_avgs(max_steps, inner_query=True)} {self._get_step_time_median(max_steps, inner_query=True)} {self._get_matching_event_arrays(max_steps)} {breakdown_clause} {outer_timestamps} {self._get_person_and_group_properties_aggregate()} FROM (
SELECT aggregation_target, steps, max(steps) over (PARTITION BY aggregation_target {breakdown_clause}) as max_steps {self._get_step_time_names(max_steps)} {self._get_matching_events(max_steps)} {breakdown_clause} {inner_timestamps} {self._get_person_and_group_properties()} FROM (
{steps_per_person_query}
)
) GROUP BY aggregation_target, steps {breakdown_clause}
Expand Down Expand Up @@ -142,7 +142,7 @@ def get_step_counts_without_aggregation_query(self):
exclusion_clause = self._get_exclusion_condition()

return f"""
SELECT *, {self._get_sorting_condition(max_steps, max_steps)} AS steps {exclusion_clause} {self._get_step_times(max_steps)}{self._get_matching_events(max_steps)} {breakdown_query} FROM (
SELECT *, {self._get_sorting_condition(max_steps, max_steps)} AS steps {exclusion_clause} {self._get_step_times(max_steps)}{self._get_matching_events(max_steps)} {breakdown_query} {self._get_person_and_group_properties()} FROM (
{formatted_query}
) WHERE step_0 = 1
{'AND exclusion = 0' if exclusion_clause else ''}
Expand Down Expand Up @@ -187,9 +187,8 @@ def get_comparison_cols(self, level_index: int, max_steps: int):
return ", ".join(cols)

def build_step_subquery(
self, level_index: int, max_steps: int, event_names_alias: str = "events", extra_fields: List[str] = []
self, level_index: int, max_steps: int, event_names_alias: str = "events",
):
parsed_extra_fields = f", {', '.join(extra_fields)}" if extra_fields else ""

if level_index >= max_steps:
return f"""
Expand All @@ -198,8 +197,8 @@ def build_step_subquery(
timestamp,
{self._get_partition_cols(1, max_steps)}
{self._get_breakdown_prop(group_remaining=True)}
{parsed_extra_fields}
FROM ({self._get_inner_event_query(entity_name=event_names_alias, extra_fields=extra_fields)})
{self._get_person_and_group_properties()}
FROM ({self._get_inner_event_query(entity_name=event_names_alias)})
"""
else:
return f"""
Expand All @@ -208,14 +207,14 @@ def build_step_subquery(
timestamp,
{self._get_partition_cols(level_index, max_steps)}
{self._get_breakdown_prop()}
{parsed_extra_fields}
{self._get_person_and_group_properties()}
FROM (
SELECT
aggregation_target,
timestamp,
{self.get_comparison_cols(level_index, max_steps)}
{self._get_breakdown_prop()}
{parsed_extra_fields}
{self._get_person_and_group_properties()}
FROM ({self.build_step_subquery(level_index + 1, max_steps)})
)
"""
107 changes: 81 additions & 26 deletions ee/clickhouse/queries/funnels/funnel_correlation.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ def __init__(
filter: Filter, #  Used to filter people
team: Team, # Used to partition by team
base_uri: str = "/", # Used to generate absolute urls
include_funnel_person_properties: Optional[bool] = None,
include_funnel_group_properties: Optional[List[int]] = None,
) -> None:
self._filter = filter
self._team = team
Expand All @@ -130,6 +132,25 @@ def __init__(
)
filter = Filter(data=filter_data)

self.query_person_properties = False
self.query_group_properties = False
if (
self._team.actor_on_events_querying_enabled
and self._filter.correlation_type == FunnelCorrelationType.PROPERTIES
):
# When dealing with properties, make sure funnel response comes with properties
# so we don't have to join on persons/groups to get these properties again
if filter.aggregation_group_type_index is not None:
self.query_group_properties = True
else:
self.query_person_properties = True

provided_funnel_group_index = include_funnel_group_properties or []
self.include_funnel_group_properties: list = [
*provided_funnel_group_index,
filter.aggregation_group_type_index,
] if self.query_group_properties else provided_funnel_group_index

funnel_order_actor_class = get_funnel_order_actor_class(filter)

self._funnel_actors_generator = funnel_order_actor_class(
Expand All @@ -142,6 +163,8 @@ def __init__(
# NOTE: we don't need these as we have all the information we need to
# deduce if the person was successful or not
include_preceding_timestamp=False,
include_person_properties=include_funnel_person_properties or self.query_person_properties,
include_group_properties=self.include_funnel_group_properties,
)

def support_autocapture_elements(self) -> bool:
Expand Down Expand Up @@ -384,21 +407,26 @@ def get_properties_query(self) -> Tuple[str, Dict[str, Any]]:
return query, params

def _get_aggregation_target_join_query(self) -> str:
aggregation_person_join = f"""
JOIN ({get_team_distinct_ids_query(self._team.pk)}) AS pdi
ON pdi.distinct_id = events.distinct_id

-- NOTE: I would love to right join here, so we count get total
-- success/failure numbers in one pass, but this causes out of memory
-- error mentioning issues with right filling. I'm sure there's a way
-- to do it but lifes too short.
JOIN funnel_actors AS actors
ON pdi.person_id = actors.actor_id

if self._team.actor_on_events_querying_enabled:
aggregation_person_join = f"""
JOIN funnel_actors as actors
ON event.person_id = actors.actor_id
"""

# :KLUDGE: aggregation_target is called person_id in funnel people CTEs.
# Since supporting that properly involves updating everything that uses the CTE to rename person_id,
# keeping it as-is for now
else:
aggregation_person_join = f"""
JOIN ({get_team_distinct_ids_query(self._team.pk)}) AS pdi
ON pdi.distinct_id = events.distinct_id

-- NOTE: I would love to right join here, so we count get total
-- success/failure numbers in one pass, but this causes out of memory
-- error mentioning issues with right filling. I'm sure there's a way
-- to do it but lifes too short.
JOIN funnel_actors AS actors
ON pdi.person_id = actors.actor_id
"""

aggregation_group_join = f"""
JOIN funnel_actors AS actors
ON actors.actor_id = events.$group_{self._filter.aggregation_group_type_index}
Expand Down Expand Up @@ -446,6 +474,9 @@ def _get_events_join_query(self) -> str:
"""

def _get_aggregation_join_query(self):
if self._team.actor_on_events_querying_enabled:
return "", {}

if self._filter.aggregation_group_type_index is None:
person_query, person_query_params = PersonQuery(
self._filter, self._team.pk, EnterpriseColumnOptimizer(self._filter, self._team.pk)
Expand All @@ -463,12 +494,19 @@ def _get_aggregation_join_query(self):

def _get_properties_prop_clause(self):

group_properties_field = f"groups_{self._filter.aggregation_group_type_index}.group_properties_{self._filter.aggregation_group_type_index}"
aggregation_properties_alias = (
PersonQuery.PERSON_PROPERTIES_ALIAS
if self._filter.aggregation_group_type_index is None
else group_properties_field
)
if self._team.actor_on_events_querying_enabled:
group_properties_field = f"group{self._filter.aggregation_group_type_index}_properties"
aggregation_properties_alias = (
"person_properties" if self._filter.aggregation_group_type_index is None else group_properties_field
)

else:
group_properties_field = f"groups_{self._filter.aggregation_group_type_index}.group_properties_{self._filter.aggregation_group_type_index}"
aggregation_properties_alias = (
PersonQuery.PERSON_PROPERTIES_ALIAS
if self._filter.aggregation_group_type_index is None
else group_properties_field
)

if "$all" in cast(list, self._filter.correlation_property_names):
map_expr = trim_quotes_expr(f"JSONExtractRaw({aggregation_properties_alias}, x)")
Expand All @@ -491,11 +529,17 @@ def _get_properties_prop_clause(self):
param_name = f"property_name_{index}"
if self._filter.aggregation_group_type_index is not None:
expression, _ = get_property_string_expr(
"groups", property_name, f"%({param_name})s", group_properties_field
"groups" if not self._team.actor_on_events_querying_enabled else "events",
property_name,
f"%({param_name})s",
aggregation_properties_alias,
)
else:
expression, _ = get_property_string_expr(
"person", property_name, f"%({param_name})s", PersonQuery.PERSON_PROPERTIES_ALIAS,
"person" if not self._team.actor_on_events_querying_enabled else "events",
property_name,
f"%({param_name})s",
aggregation_properties_alias,
)
person_property_params[param_name] = property_name
person_property_expressions.append(expression)
Expand Down Expand Up @@ -696,11 +740,18 @@ def construct_person_properties_people_url(self, success: bool, event_definition
# persons endpoint, with the breakdown value set, and we assume that
# event.event will be of the format "{property_name}::{property_value}"
property_name, property_value = event_definition["event"].split("::")
prop_type = "group" if self._filter.aggregation_group_type_index else "person"
params = self._filter.with_data(
{
"funnel_correlation_person_converted": "true" if success else "false",
"funnel_correlation_property_values": [
{"key": property_name, "value": property_value, "type": "person", "operator": "exact"}
{
"key": property_name,
"value": property_value,
"type": prop_type,
"operator": "exact",
"group_type_index": self._filter.aggregation_group_type_index,
}
],
}
).to_params()
Expand Down Expand Up @@ -755,10 +806,14 @@ def get_partial_event_contingency_tables(self) -> Tuple[List[EventContingencyTab
)

def get_funnel_actors_cte(self) -> Tuple[str, Dict[str, Any]]:

return self._funnel_actors_generator.actor_query(
limit_actors=False, extra_fields=["steps", "final_timestamp", "first_timestamp"]
)
extra_fields = ["steps", "final_timestamp", "first_timestamp"]
if self.query_person_properties:
extra_fields.append("person_properties")
if self.query_group_properties:
for group_index in self.include_funnel_group_properties:
extra_fields.append(f"group{group_index}_properties")

return self._funnel_actors_generator.actor_query(limit_actors=False, extra_fields=extra_fields)

@staticmethod
def are_results_insignificant(event_contingency_table: EventContingencyTable) -> bool:
Expand Down
Loading