Skip to content

Commit

Permalink
feat(replays): initial replays clickhouse migration (#2681)
Browse files Browse the repository at this point in the history
* replays storage

* change sharding key to hashed replay id

* add todo for bloom filter

* remove errant file

* add materialized hash column and bloom filter index
  • Loading branch information
JoshFerge authored Jun 13, 2022
1 parent 02dbd05 commit 24bc476
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 1 deletion.
1 change: 1 addition & 0 deletions snuba/clusters/storage_sets.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class StorageSetKey(Enum):
ERRORS_V2 = "errors_v2"
ERRORS_V2_RO = "errors_v2_ro"
PROFILES = "profiles"
REPLAYS = "replays"
GENERIC_METRICS_SETS = "generic_metrics_sets"


Expand Down
13 changes: 13 additions & 0 deletions snuba/migrations/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class MigrationGroup(Enum):
SESSIONS = "sessions"
QUERYLOG = "querylog"
PROFILES = "profiles"
REPLAYS = "replays"
GENERIC_METRICS = "generic_metrics"


Expand All @@ -26,6 +27,7 @@ class MigrationGroup(Enum):
MigrationGroup.SESSIONS,
MigrationGroup.QUERYLOG,
MigrationGroup.PROFILES,
MigrationGroup.REPLAYS,
MigrationGroup.GENERIC_METRICS,
}

Expand Down Expand Up @@ -158,6 +160,16 @@ def get_migrations(self) -> Sequence[str]:
]


class ReplaysLoader(DirectoryLoader):
def __init__(self) -> None:
super().__init__("snuba.migrations.snuba_migrations.replays")

def get_migrations(self) -> Sequence[str]:
return [
"0001_replays",
]


class MetricsLoader(DirectoryLoader):
def __init__(self) -> None:
super().__init__("snuba.migrations.snuba_migrations.metrics")
Expand Down Expand Up @@ -243,6 +255,7 @@ def get_migrations(self) -> Sequence[str]:
MigrationGroup.SESSIONS: SessionsLoader(),
MigrationGroup.QUERYLOG: QuerylogLoader(),
MigrationGroup.PROFILES: ProfilesLoader(),
MigrationGroup.REPLAYS: ReplaysLoader(),
MigrationGroup.GENERIC_METRICS: GenericMetricsLoader(),
}

Expand Down
108 changes: 108 additions & 0 deletions snuba/migrations/snuba_migrations/replays/0001_replays.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from typing import Sequence

from snuba.clickhouse.columns import (
UUID,
Array,
Column,
DateTime,
IPv4,
IPv6,
Nested,
String,
UInt,
)
from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations, table_engines
from snuba.migrations.columns import MigrationModifiers as Modifiers

raw_columns: Sequence[Column[Modifiers]] = [
Column("replay_id", UUID()),
Column("sequence_id", UInt(16)),
Column("trace_ids", Array(UUID())),
Column(
"_trace_ids_hashed",
UInt(64, Modifiers(materialized="arrayMap(t -> cityHash64(t), trace_ids)")),
),
Column("title", String()),
### columns used by other sentry events
Column("project_id", UInt(64)),
# time columns
Column("timestamp", DateTime()),
# release/environment info
Column("platform", String(Modifiers(low_cardinality=True))),
Column("environment", String(Modifiers(nullable=True, low_cardinality=True))),
Column("release", String(Modifiers(nullable=True))),
Column("dist", String(Modifiers(nullable=True))),
Column("ip_address_v4", IPv4(Modifiers(nullable=True))),
Column("ip_address_v6", IPv6(Modifiers(nullable=True))),
# user columns
Column("user", String()),
Column("user_hash", UInt(64)),
Column("user_id", String(Modifiers(nullable=True))),
Column("user_name", String(Modifiers(nullable=True))),
Column("user_email", String(Modifiers(nullable=True))),
# sdk info
Column("sdk_name", String()),
Column("sdk_version", String()),
Column("tags", Nested([("key", String()), ("value", String())])),
# internal data
Column("retention_days", UInt(16)),
Column("partition", UInt(16)),
Column("offset", UInt(64)),
]


class Migration(migration.ClickhouseNodeMigration):
blocking = False

def forwards_local(self) -> Sequence[operations.SqlOperation]:
return [
operations.CreateTable(
storage_set=StorageSetKey.REPLAYS,
table_name="replays_local",
columns=raw_columns,
engine=table_engines.ReplacingMergeTree(
storage_set=StorageSetKey.REPLAYS,
order_by="(project_id, toStartOfDay(timestamp), cityHash64(replay_id), sequence_id)",
partition_by="(retention_days, toMonday(timestamp))",
settings={"index_granularity": "8192"},
ttl="timestamp + toIntervalDay(retention_days)",
),
),
operations.AddIndex(
storage_set=StorageSetKey.REPLAYS,
table_name="replays_local",
index_name="bf_trace_ids_hashed",
index_expression="_trace_ids_hashed",
index_type="bloom_filter()",
granularity=1,
),
]

def backwards_local(self) -> Sequence[operations.SqlOperation]:
return [
operations.DropTable(
storage_set=StorageSetKey.REPLAYS,
table_name="replays_local",
),
]

def forwards_dist(self) -> Sequence[operations.SqlOperation]:
return [
operations.CreateTable(
storage_set=StorageSetKey.REPLAYS,
table_name="replays_dist",
columns=raw_columns,
engine=table_engines.Distributed(
local_table_name="replays_local",
sharding_key="cityHash64(toString(replay_id))",
),
),
]

def backwards_dist(self) -> Sequence[operations.SqlOperation]:
return [
operations.DropTable(
storage_set=StorageSetKey.REPLAYS, table_name="replays_dist"
),
]
Empty file.
3 changes: 2 additions & 1 deletion snuba/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"errors_v2",
"errors_v2_ro",
"profiles",
"replays",
"generic_metrics_sets",
},
"single_node": True,
Expand Down Expand Up @@ -168,7 +169,7 @@
COLUMN_SPLIT_MAX_RESULTS = 5000

# Migrations in skipped groups will not be run
SKIPPED_MIGRATION_GROUPS: Set[str] = {"querylog", "profiles"}
SKIPPED_MIGRATION_GROUPS: Set[str] = {"querylog", "profiles", "replays"}

MAX_RESOLUTION_FOR_JITTER = 60

Expand Down
1 change: 1 addition & 0 deletions snuba/settings/settings_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"errors_v2",
"errors_v2_ro",
"profiles",
"replays",
"generic_metrics_sets",
},
"single_node": False,
Expand Down

0 comments on commit 24bc476

Please sign in to comment.