Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: warming and filter #25674

Merged
merged 10 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions posthog/caching/warming.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from posthog.caching.utils import largest_teams
from posthog.clickhouse.query_tagging import tag_queries
from posthog.errors import CHQueryErrorTooManySimultaneousQueries
from posthog.hogql.constants import LimitContext
from posthog.hogql_queries.query_cache import QueryCacheManager
from posthog.hogql_queries.legacy_compatibility.flagged_conversion_manager import conversion_to_query_based
from posthog.hogql_queries.query_runner import ExecutionMode
Expand Down Expand Up @@ -126,13 +127,18 @@ def schedule_warming_for_teams_task():
max_retries=3,
)
def warm_insight_cache_task(insight_id: int, dashboard_id: Optional[int]):
insight = Insight.objects.get(pk=insight_id)
try:
insight = Insight.objects.get(pk=insight_id)
except Insight.DoesNotExist:
logger.info(f"Warming insight cache failed 404 insight not found: {insight_id}")
return

dashboard = None

tag_queries(team_id=insight.team_id, insight_id=insight.pk, trigger="warmingV2")
if dashboard_id:
tag_queries(dashboard_id=dashboard_id)
dashboard = insight.dashboards.get(pk=dashboard_id)
dashboard = insight.dashboards.filter(pk=dashboard_id).first()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to change this? Insights can be added to a dashboard multiple times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get throws an exception if it doesn’t exist, but we were expecting a none, which was leading to sentry errors


with conversion_to_query_based(insight):
logger.info(f"Warming insight cache: {insight.pk} for team {insight.team_id} and dashboard {dashboard_id}")
Expand All @@ -145,6 +151,7 @@ def warm_insight_cache_task(insight_id: int, dashboard_id: Optional[int]):
# We need an execution mode with recent cache:
# - in case someone refreshed after this task was triggered
# - if insight + dashboard combinations have the same cache key, we prevent needless recalculations
limit_context=LimitContext.QUERY_ASYNC,
execution_mode=ExecutionMode.RECENT_CACHE_CALCULATE_BLOCKING_IF_STALE,
insight_id=insight_id,
dashboard_id=dashboard_id,
Expand Down
1 change: 1 addition & 0 deletions posthog/hogql/functions/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ def compare_types(arg_types: list[ConstantType], sig_arg_types: tuple[ConstantTy
"arraySplit": HogQLFunctionMeta("arraySplit", 2, None),
"arrayReverseFill": HogQLFunctionMeta("arrayReverseFill", 2, None),
"arrayReverseSplit": HogQLFunctionMeta("arrayReverseSplit", 2, None),
"arrayRotateLeft": HogQLFunctionMeta("arrayRotateLeft", 2, 2),
"arrayRotateRight": HogQLFunctionMeta("arrayRotateRight", 2, 2),
"arrayExists": HogQLFunctionMeta("arrayExists", 1, None),
"arrayAll": HogQLFunctionMeta("arrayAll", 1, None),
Expand Down
19 changes: 19 additions & 0 deletions posthog/hogql_queries/insights/funnels/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,25 @@ def get_step_counts_query(self) -> ast.SelectQuery:
def get_step_counts_without_aggregation_query(self) -> ast.SelectQuery:
raise NotImplementedError()

# This is a simple heuristic to reduce the number of events we look at in UDF funnels (thus are serialized and sent over)
# We remove an event if it matches one or zero steps and there was already the same type of event before and after it (that don't have the same timestamp)
# arrayRotateRight turns [1,2,3] into [3,1,2]
# arrayRotateLeft turns [1,2,3] into [2,3,1]
# For some reason, using these uses much less memory than using indexing in clickhouse to check the previous and next element
def _udf_event_array_filter(self, timestamp_index: int, prop_val_index: int, steps_index: int):
return f"""arrayFilter(
(x, x_before, x_after) -> not (
length(x.{steps_index}) <= 1
and x.{steps_index} == x_before.{steps_index}
and x.{steps_index} == x_after.{steps_index}
and x.{prop_val_index} == x_before.{prop_val_index}
and x.{prop_val_index} == x_after.{prop_val_index}
and x.{timestamp_index} > x_before.{timestamp_index}
and x.{timestamp_index} < x_after.{timestamp_index}),
events_array,
arrayRotateRight(events_array, 1),
arrayRotateLeft(events_array, 1))"""

@cached_property
def breakdown_cohorts(self) -> list[Cohort]:
team, breakdown = self.context.team, self.context.breakdown
Expand Down
9 changes: 6 additions & 3 deletions posthog/hogql_queries/insights/funnels/funnel_trends_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from posthog.hogql.constants import HogQLQuerySettings
from posthog.hogql.parser import parse_select, parse_expr
from posthog.hogql_queries.insights.funnels import FunnelTrends
from posthog.hogql_queries.insights.funnels.funnel_udf import udf_event_array_filter
from posthog.hogql_queries.insights.utils.utils import get_start_of_interval_hogql_str
from posthog.schema import BreakdownType, BreakdownAttributionType
from posthog.utils import DATERANGE_MAP, relative_date_parse
Expand Down Expand Up @@ -47,6 +46,9 @@ def matched_event_select(self):
"""
return ""

def udf_event_array_filter(self):
return self._udf_event_array_filter(1, 4, 5)

# This is the function that calls the UDF
# This is used by both the query itself and the actors query
def _inner_aggregation_query(self):
Expand Down Expand Up @@ -103,15 +105,16 @@ def _inner_aggregation_query(self):
_toUInt64(toDateTime({get_start_of_interval_hogql_str(self.context.interval.value, team=self.context.team, source='timestamp')})),
uuid,
{prop_selector},
arrayFilter((x) -> x != 0, [{steps}{exclusions}])))) as events_array,
arrayFilter((x) -> x != 0, [{steps}{exclusions}])
))) as events_array,
arrayJoin({fn}(
{from_step},
{max_steps},
{self.conversion_window_limit()},
'{breakdown_attribution_string}',
'{self.context.funnelsFilter.funnelOrderType}',
{prop_vals},
{udf_event_array_filter(self.context.funnelsFilter.funnelOrderType)}
{self.udf_event_array_filter()}
)) as af_tuple,
toTimeZone(toDateTime(_toUInt64(af_tuple.1)), '{self.context.team.timezone}') as entrance_period_start,
af_tuple.2 as success_bool,
Expand Down
29 changes: 11 additions & 18 deletions posthog/hogql_queries/insights/funnels/funnel_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,13 @@
from posthog.hogql import ast
from posthog.hogql.parser import parse_select, parse_expr
from posthog.hogql_queries.insights.funnels.base import FunnelBase
from posthog.schema import BreakdownType, BreakdownAttributionType, StepOrderValue
from posthog.schema import BreakdownType, BreakdownAttributionType
from posthog.utils import DATERANGE_MAP

TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S"
HUMAN_READABLE_TIMESTAMP_FORMAT = "%-d-%b-%Y"


# This is used to reduce the number of events we look at in strict funnels
# We remove a non-matching event if there was already one before it (that don't have the same timestamp)
# arrayRotateRight turns [1,2,3] into [3,1,2]
# For some reason, this uses much less memory than using indexing in clickhouse to check the previous element
def udf_event_array_filter(funnelOrderType: StepOrderValue | None):
if funnelOrderType == "strict":
return f"""
arrayFilter(
(x, x2) -> not (empty(x.4) and empty(x2.4) and x.3 == x2.3 and x.1 > x2.1),
events_array,
arrayRotateRight(events_array, 1))
"""
return "events_array"


class FunnelUDF(FunnelBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand All @@ -51,6 +36,9 @@ def matched_event_arrays_selects(self):
"""
return ""

def udf_event_array_filter(self):
return self._udf_event_array_filter(1, 3, 4)

# This is the function that calls the UDF
# This is used by both the query itself and the actors query
def _inner_aggregation_query(self):
Expand Down Expand Up @@ -92,14 +80,19 @@ def _inner_aggregation_query(self):
inner_select = parse_select(
f"""
SELECT
arraySort(t -> t.1, groupArray(tuple(toFloat(timestamp), uuid, {prop_selector}, arrayFilter((x) -> x != 0, [{steps}{exclusions}])))) as events_array,
arraySort(t -> t.1, groupArray(tuple(
toFloat(timestamp),
uuid,
{prop_selector},
arrayFilter((x) -> x != 0, [{steps}{exclusions}])
))) as events_array,
arrayJoin({fn}(
{self.context.max_steps},
{self.conversion_window_limit()},
'{breakdown_attribution_string}',
'{self.context.funnelsFilter.funnelOrderType}',
{prop_vals},
{udf_event_array_filter(self.context.funnelsFilter.funnelOrderType)}
{self.udf_event_array_filter()}
)) as af_tuple,
af_tuple.1 as step_reached,
af_tuple.1 + 1 as steps, -- Backward compatibility
Expand Down
Loading
Loading