Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 63 additions & 13 deletions v03_pipeline/lib/misc/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,9 @@ def for_dataset_type(


class ClickHouseMaterializedView(StrEnum):
CLINVAR_ALL_VARIANTS_TO_CLINVAR_MV = 'clinvar_all_variants_to_clinvar_mv'
ENTRIES_TO_PROJECT_GT_STATS_MV = 'entries_to_project_gt_stats_mv'
PROJECT_GT_STATS_TO_GT_STATS_MV = 'project_gt_stats_to_gt_stats_mv'

@classmethod
def for_dataset_type_refreshable(cls, dataset_type: DatasetType):
if dataset_type in {DatasetType.SV, DatasetType.GCNV}:
return []
return [ClickHouseMaterializedView.CLINVAR_ALL_VARIANTS_TO_CLINVAR_MV]

@classmethod
def for_dataset_type_atomic_entries_update(
cls,
Expand Down Expand Up @@ -234,6 +227,36 @@ def src_table(self, clickhouse_table: ClickHouseTable):
return f"file('{path}', 'Parquet')"


class ClickhouseReferenceData(StrEnum):
CLINVAR = 'clinvar'

@classmethod
def for_dataset_type(cls, dataset_type: DatasetType):
if dataset_type in {DatasetType.SV, DatasetType.GCNV}:
return []
return [ClickhouseReferenceData.CLINVAR]

def search_is_join_table(self):
return self == ClickhouseReferenceData.CLINVAR

def all_variants_path(self, table_name_builder: TableNameBuilder) -> str:
return (
f'{table_name_builder.dst_prefix}/reference_data/{self.value}/all_variants`'
)

def seqr_variants_path(self, table_name_builder: TableNameBuilder) -> str:
return f'{table_name_builder.dst_prefix}/reference_data/{self.value}/seqr_variants`'

def search_path(self, table_name_builder: TableNameBuilder) -> str:
return f'{table_name_builder.dst_prefix}/reference_data/{self.value}`'

def seqr_variants_to_search_mv_path(
self,
table_name_builder: TableNameBuilder,
) -> str:
return f'{table_name_builder.dst_prefix}/reference_data/{self.value}/seqr_variants_to_search_mv`'


def logged_query(query, params=None, timeout: int | None = None):
client = get_clickhouse_client(timeout)
sanitized_query = query
Expand Down Expand Up @@ -759,12 +782,39 @@ def load_complete_run(
project_guids=project_guids,
family_guids=family_guids,
)
refresh_materialized_views(
table_name_builder,
ClickHouseMaterializedView.for_dataset_type_refreshable(
dataset_type,
),
)
for clickhouse_reference_data in ClickhouseReferenceData.for_dataset_type(
dataset_type,
):
logged_query(
f"""
INSERT INTO {clickhouse_reference_data.seqr_variants_path(table_name_builder)}
SELECT
DISTINCT ON (key)
dst.key as key,
COLUMNS('.*') EXCEPT(version, variantId, key)
FROM {clickhouse_reference_data.all_variants_path(table_name_builder)} src
INNER JOIN {table_name_builder.dst_table(ClickHouseTable.KEY_LOOKUP)} dst
ON {ClickHouseTable.KEY_LOOKUP.join_condition}
""",
)
if clickhouse_reference_data.search_is_join_table:
logged_query(
f"""
SYSTEM REFRESH VIEW {clickhouse_reference_data.seqr_variants_to_search_mv_path(table_name_builder)}
""",
)
logged_query(
f"""
SYSTEM WAIT VIEW {clickhouse_reference_data.seqr_variants_to_search_mv_path(table_name_builder)}
""",
timeout=300,
)
else:
logged_query(
f"""
SYSTEM RELOAD DICTIONARY {clickhouse_reference_data.search_path(table_name_builder)}
""",
)


@retry()
Expand Down
32 changes: 29 additions & 3 deletions v03_pipeline/lib/misc/clickhouse_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,36 @@ def setUp(self):
)
client.execute(
f"""
CREATE MATERIALIZED VIEW {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/clinvar_all_variants_to_clinvar_mv`
CREATE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/reference_data/clinvar/all_variants` (
`variantId` String,
`alleleId` Nullable(UInt32),
`pathogenicity` Enum8(
'Pathogenic' = 0,
'Pathogenic/Likely_pathogenic' = 1
)
)
PRIMARY KEY `variantId`
""",
)
client.execute(
f"""
CREATE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/reference_data/clinvar/seqr_variants` (
`key` UInt32,
`alleleId` Nullable(UInt32),
`pathogenicity` Enum8(
'Pathogenic' = 0,
'Pathogenic/Likely_pathogenic' = 1
)
)
PRIMARY KEY `key`
""",
)
client.execute(
f"""
CREATE MATERIALIZED VIEW {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/reference_data/clinvar/seqr_variants_to_search_mv`
REFRESH EVERY 10 YEAR ENGINE = Null
AS SELECT *
FROM {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/key_lookup`
AS SELECT DISTINCT ON (key) *
FROM {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/reference_data/clinvar/seqr_variants`
""",
)
client.execute(
Expand Down