Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(person-on-events): Person on events trends #9645

Merged
merged 70 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
9dec8ec
add constance setting
EDsCODE May 4, 2022
e0a5519
add minimum viable code
EDsCODE May 4, 2022
19250ba
update journeys_for
EDsCODE May 4, 2022
1b8616a
working test case
EDsCODE May 4, 2022
e50541b
chore: remove eventserializer in unnecessary places
EDsCODE May 5, 2022
b6501f0
change execution
EDsCODE May 5, 2022
de8179f
change imports
EDsCODE May 5, 2022
87a62ad
fix typing and tests
EDsCODE May 10, 2022
dfcb658
change format
EDsCODE May 10, 2022
59e7cbc
adjust
EDsCODE May 10, 2022
9f9ac5d
reset
EDsCODE May 10, 2022
347dcbb
Merge branch 'refactor-eventserializer' into person-on-events-trends
EDsCODE May 10, 2022
602c967
Merge branch 'master' into refactor-eventserializer
EDsCODE May 10, 2022
7d8d518
Merge branch 'refactor-eventserializer' into person-on-events-trends
EDsCODE May 10, 2022
9f2fb9a
split group path in trend breakdown
EDsCODE May 10, 2022
1126c78
fix: typing
EDsCODE May 10, 2022
83b2ac6
fix test
EDsCODE May 11, 2022
992e887
fix test, rename
neilkakkar May 16, 2022
e93f3e5
automatically add persons to events in tests on flush
neilkakkar May 16, 2022
0c7fb10
test CI with person-on-events
neilkakkar May 17, 2022
33a6235
wip
neilkakkar May 17, 2022
1d70928
fix
neilkakkar May 17, 2022
a7aeef4
fix ci
neilkakkar May 17, 2022
6b4a263
merge master
neilkakkar May 17, 2022
a209368
add comma
neilkakkar May 17, 2022
79a06a3
reduce test explosion
neilkakkar May 17, 2022
f17b33b
fix CH server image version #9743
neilkakkar May 17, 2022
6155027
update conditional
neilkakkar May 17, 2022
a38c21c
split person on events
neilkakkar May 17, 2022
1bfbf41
split person on events
neilkakkar May 17, 2022
fe6b5cd
split properly
neilkakkar May 17, 2022
c0b9375
fix test deps
neilkakkar May 17, 2022
110b832
add persons on events for tests
neilkakkar May 17, 2022
a6da68f
rm eventserializer changes
neilkakkar May 17, 2022
89accdf
upd
neilkakkar May 17, 2022
37a1a9c
Merge branch 'master' of github.com:PostHog/posthog into person-event…
neilkakkar May 17, 2022
72db871
fix tests
neilkakkar May 17, 2022
4a76432
streamline person on event tests
neilkakkar May 17, 2022
d4af83f
remove cache dependence, even after dematerialization deletion
neilkakkar May 17, 2022
e44e4d3
disregard snapshot tests
neilkakkar May 18, 2022
aa6a840
no casting for matrix variables
neilkakkar May 18, 2022
5513b73
resolve conflicts
neilkakkar May 18, 2022
f86fde9
fix actions problems
neilkakkar May 18, 2022
8358c0f
try to limit blast radius
neilkakkar May 18, 2022
0d67875
Update snapshots
github-actions[bot] May 18, 2022
342affa
why are these tests passing?
neilkakkar May 18, 2022
784ce42
Merge branch 'person-on-events-trends' of github.com:PostHog/posthog …
neilkakkar May 18, 2022
ff27e6d
stoopid
neilkakkar May 18, 2022
7c7223d
fix trend tests
neilkakkar May 18, 2022
4373cec
fix more trend tests, add test for old query regression
neilkakkar May 19, 2022
0cff27c
resolve conflicts
neilkakkar May 19, 2022
64f2848
Update snapshots
github-actions[bot] May 19, 2022
eb8611a
optimise persons
neilkakkar May 19, 2022
05a5116
Update snapshots
github-actions[bot] May 19, 2022
3675134
Update snapshots
github-actions[bot] May 19, 2022
1875429
add all snapshots automatically
neilkakkar May 19, 2022
0d8ab9b
Merge branch 'person-on-events-trends' of github.com:PostHog/posthog …
neilkakkar May 19, 2022
85a8298
fix ci
neilkakkar May 19, 2022
cd2595d
save
neilkakkar May 19, 2022
cdce931
Update snapshots
github-actions[bot] May 19, 2022
c3be243
resolve conflicts
neilkakkar May 20, 2022
fa1a258
Update snapshots
github-actions[bot] May 20, 2022
448dcbb
Update snapshots
github-actions[bot] May 20, 2022
4b09692
Update snapshots
github-actions[bot] May 20, 2022
e5305cc
Update snapshots
github-actions[bot] May 20, 2022
9027ee3
address comments
neilkakkar May 20, 2022
34f9962
Update snapshots
github-actions[bot] May 20, 2022
d4ea909
fix(person-on-events): trigger test
EDsCODE May 20, 2022
c3ad9ba
Merge branch 'person-on-events-trends' of github.com:PostHog/posthog …
EDsCODE May 20, 2022
d053860
Merge branch 'master' into person-on-events-trends
EDsCODE May 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 17 additions & 20 deletions ee/clickhouse/models/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from ee.clickhouse.models.element import chain_to_elements, elements_to_string
from ee.clickhouse.sql.events import BULK_INSERT_EVENT_SQL, GET_EVENTS_BY_TEAM_SQL, INSERT_EVENT_SQL
from posthog.client import sync_execute
from posthog.client import query_with_columns, sync_execute
from posthog.models.element import Element
from posthog.models.person import Person
from posthog.models.team import Team
Expand Down Expand Up @@ -108,7 +108,8 @@ def bulk_create_events(events: List[Dict[str, Any]]):


def get_events_by_team(team_id: Union[str, int]):
events = sync_execute(GET_EVENTS_BY_TEAM_SQL, {"team_id": str(team_id)})

events = query_with_columns(GET_EVENTS_BY_TEAM_SQL, {"team_id": str(team_id)})
return ClickhouseEventSerializer(events, many=True, context={"elements": None, "people": None}).data


Expand Down Expand Up @@ -143,34 +144,30 @@ class ClickhouseEventSerializer(serializers.Serializer):
elements_chain = serializers.SerializerMethodField()

def get_id(self, event):
return str(event[0])
return str(event["uuid"])

def get_distinct_id(self, event):
return event[5]
return event["distinct_id"]

def get_properties(self, event):
if len(event) >= 10 and event[8] and event[9]:
prop_vals = [res.strip('"') for res in event[9]]
return dict(zip(event[8], prop_vals))
else:
# parse_constants gets called for any NaN, Infinity etc values
# we just want those to be returned as None
props = json.loads(event[2], parse_constant=lambda x: None)
unpadded = {key: value.strip('"') if isinstance(value, str) else value for key, value in props.items()}
return unpadded
# parse_constants gets called for any NaN, Infinity etc values
# we just want those to be returned as None
props = json.loads(event["properties"], parse_constant=lambda x: None)
unpadded = {key: value.strip('"') if isinstance(value, str) else value for key, value in props.items()}
return unpadded

def get_event(self, event):
return event[1]
return event["event"]

def get_timestamp(self, event):
dt = event[3].replace(tzinfo=timezone.utc)
dt = event["timestamp"].replace(tzinfo=timezone.utc)
return dt.astimezone().isoformat()

def get_person(self, event):
if not self.context.get("people") or event[5] not in self.context["people"]:
if not self.context.get("people") or event["distinct_id"] not in self.context["people"]:
return None

person = self.context["people"][event[5]]
person = self.context["people"][event["distinct_id"]]
return {
"is_identified": person.is_identified,
"distinct_ids": person.distinct_ids[:1], # only send the first one to avoid a payload bloat
Expand All @@ -180,12 +177,12 @@ def get_person(self, event):
}

def get_elements(self, event):
if not event[6]:
if not event["elements_chain"]:
return []
return ElementSerializer(chain_to_elements(event[6]), many=True).data
return ElementSerializer(chain_to_elements(event["elements_chain"]), many=True).data

def get_elements_chain(self, event):
return event[6]
return event["elements_chain"]


def determine_event_conditions(
Expand Down
4 changes: 2 additions & 2 deletions ee/clickhouse/models/test/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ee.clickhouse.sql.events import GET_EVENTS_WITH_PROPERTIES
from ee.clickhouse.test.test_journeys import journeys_for
from ee.clickhouse.util import ClickhouseTestMixin
from posthog.client import sync_execute
from posthog.client import query_with_columns, sync_execute
from posthog.constants import FILTER_TEST_ACCOUNTS
from posthog.models import Element, Organization, Person, Team
from posthog.models.cohort import Cohort
Expand All @@ -24,7 +24,7 @@ def _filter_events(filter: Filter, team: Team, order_by: Optional[str] = None):
)
params = {"team_id": team.pk, **prop_filter_params}

events = sync_execute(
events = query_with_columns(
GET_EVENTS_WITH_PROPERTIES.format(
filters=prop_filters, order_by="ORDER BY {}".format(order_by) if order_by else ""
),
Expand Down
20 changes: 1 addition & 19 deletions ee/clickhouse/system_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from ee.clickhouse.models.event import get_event_count, get_event_count_for_last_month, get_event_count_month_to_date
from posthog.api.dead_letter_queue import get_dead_letter_queue_events_last_24h, get_dead_letter_queue_size
from posthog.client import make_ch_pool, sync_execute
from posthog.client import make_ch_pool, query_with_columns, sync_execute
from posthog.settings import CLICKHOUSE_PASSWORD, CLICKHOUSE_STABLE_HOST, CLICKHOUSE_USER

SLOW_THRESHOLD_MS = 10000
Expand Down Expand Up @@ -168,24 +168,6 @@ def get_clickhouse_slow_log() -> List[Dict]:
)


def query_with_columns(query, args=None, columns_to_remove=[]) -> List[Dict]:
metrics, types = sync_execute(query, args, with_column_types=True)
type_names = [key for key, _type in types]

rows = []
for row in metrics:
result = {}
for type_name, value in zip(type_names, row):
if isinstance(value, list):
value = ", ".join(map(str, value))
if type_name not in columns_to_remove:
result[type_name] = value

rows.append(result)

return rows


def analyze_query(query: str):
random_id = str(uuid.uuid4())

Expand Down
16 changes: 10 additions & 6 deletions posthog/api/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
)
from posthog.api.documentation import PropertiesSerializer, extend_schema
from posthog.api.routing import StructuredViewSetMixin
from posthog.client import sync_execute
from posthog.client import query_with_columns, sync_execute
from posthog.models import Element, Filter, Person
from posthog.models.action import Action
from posthog.models.action.util import format_action_filter
Expand Down Expand Up @@ -120,12 +120,12 @@ def list(self, request: request.Request, *args: Any, **kwargs: Any) -> response.

next_url: Optional[str] = None
if not is_csv_request and len(query_result) > limit:
next_url = self._build_next_url(request, query_result[limit - 1][3])
next_url = self._build_next_url(request, query_result[limit - 1]["timestamp"])

return response.Response({"next": next_url, "results": result})

def _get_people(self, query_result: List[Dict], team: Team) -> Dict[str, Any]:
distinct_ids = [event[5] for event in query_result]
distinct_ids = [event["distinct_id"] for event in query_result]
persons = get_persons_by_distinct_ids(team.pk, distinct_ids)
persons = persons.prefetch_related(Prefetch("persondistinctid_set", to_attr="distinct_ids_cache"))
distinct_to_person: Dict[str, Person] = {}
Expand All @@ -137,6 +137,7 @@ def _get_people(self, query_result: List[Dict], team: Team) -> Dict[str, Any]:
def _query_events_list(
self, filter: Filter, team: Team, request: request.Request, long_date_from: bool = False, limit: int = 100
) -> List:

limit += 1
limit_sql = "LIMIT %(limit)s"
order = "DESC" if self._parse_order_by(self.request)[0] == "-timestamp" else "ASC"
Expand Down Expand Up @@ -166,26 +167,29 @@ def _query_events_list(
prop_filter_params = {**prop_filter_params, **params}

if prop_filters != "":
return sync_execute(
return query_with_columns(
SELECT_EVENT_BY_TEAM_AND_CONDITIONS_FILTERS_SQL.format(
conditions=conditions, limit=limit_sql, filters=prop_filters, order=order
),
{"team_id": team.pk, "limit": limit, **condition_params, **prop_filter_params},
)
else:
return sync_execute(
return query_with_columns(
SELECT_EVENT_BY_TEAM_AND_CONDITIONS_SQL.format(conditions=conditions, limit=limit_sql, order=order),
{"team_id": team.pk, "limit": limit, **condition_params},
)

def retrieve(
self, request: request.Request, pk: Optional[Union[int, str]] = None, *args: Any, **kwargs: Any
) -> response.Response:

if not isinstance(pk, str) or not UUIDT.is_valid_uuid(pk):
return response.Response(
{"detail": "Invalid UUID", "code": "invalid", "type": "validation_error",}, status=400
)
query_result = sync_execute(SELECT_ONE_EVENT_SQL, {"team_id": self.team.pk, "event_id": pk.replace("-", "")})
query_result = query_with_columns(
SELECT_ONE_EVENT_SQL, {"team_id": self.team.pk, "event_id": pk.replace("-", "")}
)
if len(query_result) == 0:
raise NotFound(detail=f"No events exist for event UUID {pk}")

Expand Down
33 changes: 27 additions & 6 deletions posthog/api/test/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,18 +571,39 @@ def test_get_events_with_specified_token(self):
response_invalid_token = self.client.get(f"/api/projects/{self.team.id}/events?token=invalid")
self.assertEqual(response_invalid_token.status_code, 401)

@patch("posthog.api.event.sync_execute")
def test_optimize_query(self, patch_sync_execute):
@patch("posthog.api.event.query_with_columns")
def test_optimize_query(self, patch_query_with_columns):
#  For ClickHouse we normally only query the last day,
# but if a user doesn't have many events we still want to return events that are older
patch_sync_execute.return_value = [("event", "d", "{}", timezone.now(), "d", "d", "d")]
patch_query_with_columns.return_value = [
{
"uuid": "event",
"event": "d",
"properties": "{}",
"timestamp": timezone.now(),
"team_id": "d",
"distinct_id": "d",
"elements_chain": "d",
}
]
response = self.client.get(f"/api/projects/{self.team.id}/events/").json()
self.assertEqual(len(response["results"]), 1)
self.assertEqual(patch_sync_execute.call_count, 2)
self.assertEqual(patch_query_with_columns.call_count, 2)

patch_sync_execute.return_value = [("event", "d", "{}", timezone.now(), "d", "d", "d") for _ in range(0, 100)]
patch_query_with_columns.return_value = [
{
"uuid": "event",
"event": "d",
"properties": "{}",
"timestamp": timezone.now(),
"team_id": "d",
"distinct_id": "d",
"elements_chain": "d",
}
for _ in range(0, 100)
]
response = self.client.get(f"/api/projects/{self.team.id}/events/").json()
self.assertEqual(patch_sync_execute.call_count, 3)
self.assertEqual(patch_query_with_columns.call_count, 3)

def test_filter_events_by_being_after_properties_with_date_type(self):
journeys_for(
Expand Down
18 changes: 18 additions & 0 deletions posthog/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,24 @@ def sync_execute(query, args=None, settings=None, with_column_types=False, flush
return result


def query_with_columns(query, args=None, columns_to_remove=[]) -> List[Dict]:
metrics, types = sync_execute(query, args, with_column_types=True)
type_names = [key for key, _type in types]

rows = []
for row in metrics:
result = {}
for type_name, value in zip(type_names, row):
if isinstance(value, list):
value = ", ".join(map(str, value))
if type_name not in columns_to_remove:
result[type_name] = value

rows.append(result)

return rows


REDIS_STATUS_TTL = 600 # 10 minutes


Expand Down