Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@
from .common.config import Settings
from .common.dependencies import get_settings

SettingsDep = Annotated[Settings, Depends(get_settings)]

def get_clickhouse_session(settings: Annotated[Settings, Depends(get_settings)]):

def get_clickhouse_session(settings: SettingsDep):
db = Clickhouse.from_url(settings.clickhouse_url)
try:
yield db
finally:
db.disconnect()


ClickhouseDep = Annotated[Clickhouse, Depends(get_clickhouse_session)]
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
from enum import Enum
import time
import math
from datetime import datetime
from typing import List, Literal, Optional, Tuple, Union, Dict
from ...common.clickhouse_utils import query_click
from typing import Any, List, Literal, Optional, Self, Tuple, Dict
from typing_extensions import Annotated
from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel
from pydantic import BaseModel, Field
import sqlalchemy as sql

from .utils import get_measurement_start_day_agg, TimeGrains, parse_probe_asn_to_int
from ...dependencies import (
get_clickhouse_session,
)
from ...utils.api import ProbeCCOrNone, ProbeASNOrNone
from ...dependencies import get_clickhouse_session, ClickhouseDep
from .list_analysis import (
SinceUntil,
utc_30_days_ago,
Expand Down Expand Up @@ -279,6 +281,10 @@ def format_aggregate_query(extra_cols: Dict[str, str], where: str):
)
"""

def nan_to_none(val):
if math.isnan(val):
return None
return val

@router.get(
"/v1/aggregation/analysis",
Expand All @@ -292,8 +298,8 @@ async def get_aggregation_analysis(
test_name: Annotated[Optional[str], Query()] = None,
domain: Annotated[Optional[str], Query()] = None,
input: Annotated[Optional[str], Query()] = None,
probe_asn: Annotated[Union[int, str, None], Query()] = None,
probe_cc: Annotated[Optional[str], Query(min_length=2, max_length=2)] = None,
probe_asn: ProbeASNOrNone = None,
probe_cc: ProbeCCOrNone = None,
ooni_run_link_id: Annotated[Optional[str], Query()] = None,
since: SinceUntil = utc_30_days_ago(),
until: SinceUntil = utc_today(),
Expand Down Expand Up @@ -397,10 +403,6 @@ async def get_aggregation_analysis(
d = dict(zip(list(extra_cols.keys()) + fixed_cols, row))
blocked_max_protocol = d["blocked_max_protocol"]

def nan_to_none(val):
if math.isnan(val):
return None
return val

loni = Loni(
dns_blocked=nan_to_none(d["dns_blocked"]),
Expand Down Expand Up @@ -447,3 +449,152 @@ def nan_to_none(val):
dimension_count=dimension_count,
results=results,
)


class ChangeDir(str, Enum):
up = "up"
down = "down"

@classmethod
def from_n_or_i(cls, i: int | None) -> Self | None:
if i is None:
return None

return cls("down") if i == -1 else cls("up")


class ChangePointEntry(BaseModel):
# TODO Double check which fields are actually necessary
probe_asn: int
probe_cc: str
domain: str
start_time: datetime # TODO double check the naming of these datetime fields
end_time: datetime
count_isp_resolver: int
count_other_resolver: int
count: int
dns_isp_blocked: float | None
dns_other_blocked: float | None
tcp_blocked: float | None
tls_blocked: float | None
dns_isp_blocked_obs_w_sum: float | None
dns_isp_blocked_w_sum: float | None
dns_isp_blocked_s_pos: float | None
dns_isp_blocked_s_neg: float | None
dns_other_blocked_obs_w_sum: float | None
dns_other_blocked_w_sum: float | None
dns_other_blocked_s_pos: float | None
dns_other_blocked_s_neg: float | None
tcp_blocked_obs_w_sum: float | None
tcp_blocked_w_sum: float | None
tcp_blocked_s_pos: float | None
tcp_blocked_s_neg: float | None
tls_blocked_obs_w_sum: float | None
tls_blocked_w_sum: float | None
tls_blocked_s_pos: float | None
tls_blocked_s_neg: float | None
change_dir: ChangeDir | None = Field(
description="If blocking behaviour goes up or down"
)
s_pos: float | None
s_neg: float | None
current_mean: float | None
h: float | None

@classmethod
def from_row(cls, row: Dict[str, Any]) -> Self:
"""
Takes a row as it comes from the clickhouse table 'event_detector_changepoints'
and converts it to a changepoint entry
"""

def g(s : str) -> Any | None:
return row.get(s)

return ChangePointEntry(
probe_asn=g("probe_asn"),
probe_cc=g("probe_cc"),
domain=g("domain"),
start_time=g("ts"),
end_time=g("last_ts"),
count_isp_resolver=g("count_isp_resolver"),
count_other_resolver=g("count_other_resolver"),
count=g("count"),
dns_isp_blocked= nan_to_none(g("dns_isp_blocked")),
dns_other_blocked=nan_to_none(g("dns_other_blocked")),
tcp_blocked=nan_to_none(g("tcp_blocked")),
tls_blocked=nan_to_none(g("tls_blocked")),
dns_isp_blocked_obs_w_sum=nan_to_none(g("dns_isp_blocked_obs_w_sum")),
dns_isp_blocked_w_sum=nan_to_none(g("dns_isp_blocked_w_sum")),
dns_isp_blocked_s_pos=nan_to_none(g("dns_isp_blocked_s_pos")),
dns_isp_blocked_s_neg=nan_to_none(g("dns_isp_blocked_s_neg")),
dns_other_blocked_obs_w_sum=nan_to_none(g("dns_other_blocked_obs_w_sum")),
dns_other_blocked_w_sum=nan_to_none(g("dns_other_blocked_w_sum")),
dns_other_blocked_s_pos=nan_to_none(g("dns_other_blocked_s_pos")),
dns_other_blocked_s_neg=nan_to_none(g("dns_other_blocked_s_neg")),
tcp_blocked_obs_w_sum=nan_to_none(g("tcp_blocked_obs_w_sum")),
tcp_blocked_w_sum=nan_to_none(g("tcp_blocked_w_sum")),
tcp_blocked_s_pos=nan_to_none(g("tcp_blocked_s_pos")),
tcp_blocked_s_neg=nan_to_none(g("tcp_blocked_s_neg")),
tls_blocked_obs_w_sum=nan_to_none(g("tls_blocked_obs_w_sum")),
tls_blocked_w_sum=nan_to_none(g("tls_blocked_w_sum")),
tls_blocked_s_pos=nan_to_none(g("tls_blocked_s_pos")),
tls_blocked_s_neg=nan_to_none(g("tls_blocked_s_neg")),
change_dir=ChangeDir.from_n_or_i(g("change_dir")),
s_pos=nan_to_none(g("s_pos")),
s_neg=nan_to_none(g("s_neg")),
current_mean=nan_to_none(g("current_mean")),
h=nan_to_none(g("h")),
) # type: ignore


class ListChangePointsResponse(BaseModel):
results: List[ChangePointEntry]


@router.get(
"/v1/detector/changepoints",
tags=["detector"],
description="List changepoints detected by the event detector using the cusum algorithm",
response_model=ListChangePointsResponse,
)
@parse_probe_asn_to_int
async def list_changepoints(
clickhouse: ClickhouseDep,
probe_asn: ProbeASNOrNone = None,
probe_cc: ProbeCCOrNone = None,
domain: str | None = Query(default=None),
since: SinceUntil = utc_30_days_ago(),
until: SinceUntil = utc_today(),
) -> ListChangePointsResponse:
conditions = []
query_params = {}

if probe_asn:
conditions.append(sql.text("probe_asn = :probe_asn"))
query_params["probe_asn"] = probe_asn

if probe_cc:
conditions.append(sql.text("probe_cc = :probe_cc"))
query_params["probe_cc"] = probe_cc

if domain:
conditions.append(
sql.text("domain = :domain")
) # TODO should this be 'like %domain%'?
query_params["domain"] = domain

conditions.append(sql.text("ts >= :since"))
query_params["since"] = since

conditions.append(sql.text("ts <= :until"))
query_params["until"] = until

changepoints = sql.table("event_detector_changepoints")
q = sql.select("*").select_from(changepoints).where(sql.and_(*conditions))

query_result = query_click(clickhouse, q, query_params)

results = [ChangePointEntry.from_row(entry) for entry in query_result]

return ListChangePointsResponse(results=results)
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ def report_id_validator(cls, report_id: str) -> str:

return report_id


def validate_report_id(report_id: str) -> str:
if len(report_id) < 15 or len(report_id) > 100:
raise HTTPException(
Expand All @@ -483,6 +484,7 @@ def validate_report_id(report_id: str) -> str:

return report_id


@router.get("/v1/measurement_meta", response_model_exclude_unset=True)
async def get_measurement_meta(
response: Response,
Expand All @@ -505,7 +507,7 @@ async def get_measurement_meta(
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Missing measurement_uid or report_id. You should provide at the least one"
detail="Missing measurement_uid or report_id. You should provide at the least one",
)

if msmt_meta.probe_asn is not None and isinstance(msmt_meta.probe_asn, str):
Expand Down Expand Up @@ -1019,6 +1021,7 @@ def get_bucket_url(bucket_name: str) -> str:
def asn_to_int(asn_str: str) -> int:
return int(asn_str.strip("AS"))


def is_in_charset(s: str, charset: str, error_msg: str):
"""Ensure `s` contains only valid characters listed in `charset`"""
for c in s:
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""
Utility functions and types to assist API development
"""

from typing import Annotated, Optional, Union
from fastapi import Query


ProbeCCOrNone = Annotated[Optional[str], Query(min_length=2, max_length=2)]
ProbeASNOrNone = Annotated[Union[int, str, None], Query()]
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ ENGINE = ReplacingMergeTree
ORDER BY (measurement_start_time, report_id, input)
SETTINGS index_granularity = 8192;

CREATE TABLE IF NOT EXISTS default.citizenlab
CREATE TABLE IF NOT EXISTS default.citizenlab
(
`domain` String,
`url` String,
Expand All @@ -64,3 +64,44 @@ CREATE TABLE IF NOT EXISTS default.jsonl
ENGINE = MergeTree
ORDER BY (report_id, input)
SETTINGS index_granularity = 8192;

CREATE TABLE IF NOT EXISTS default.event_detector_changepoints
(
`probe_asn` UInt32,
`probe_cc` String,
`domain` String,
`ts` DateTime64(3, 'UTC'),
`count_isp_resolver` Nullable(UInt32),
`count_other_resolver` Nullable(UInt32),
`count` Nullable(UInt32),
`dns_isp_blocked` Nullable(Float32),
`dns_other_blocked` Nullable(Float32),
`tcp_blocked` Nullable(Float32),
`tls_blocked` Nullable(Float32),
`last_ts` DateTime64(3, 'UTC'),
`dns_isp_blocked_obs_w_sum` Nullable(Float32),
`dns_isp_blocked_w_sum` Nullable(Float32),
`dns_isp_blocked_s_pos` Nullable(Float32),
`dns_isp_blocked_s_neg` Nullable(Float32),
`dns_other_blocked_obs_w_sum` Nullable(Float32),
`dns_other_blocked_w_sum` Nullable(Float32),
`dns_other_blocked_s_pos` Nullable(Float32),
`dns_other_blocked_s_neg` Nullable(Float32),
`tcp_blocked_obs_w_sum` Nullable(Float32),
`tcp_blocked_w_sum` Nullable(Float32),
`tcp_blocked_s_pos` Nullable(Float32),
`tcp_blocked_s_neg` Nullable(Float32),
`tls_blocked_obs_w_sum` Nullable(Float32),
`tls_blocked_w_sum` Nullable(Float32),
`tls_blocked_s_pos` Nullable(Float32),
`tls_blocked_s_neg` Nullable(Float32),
`change_dir` Nullable(Int8),
`s_pos` Nullable(Float32),
`s_neg` Nullable(Float32),
`current_mean` Nullable(Float32),
`h` Nullable(Float32)
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(ts)
ORDER BY (probe_asn, probe_cc, ts, domain)
SETTINGS index_granularity = 8192;
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
INSERT INTO default.event_detector_changepoints VALUES
(945,'US','www.facebook.com','2024-01-15 18:00:00.000',0,2,2,nan,0,0.75,0,'2024-01-10 23:00:00.000',nan,0,0,0,0,373,0,0,1.5,373,0,0,0,373,0,0,-1,3.6899884,0,0.04597198,3.5),
(15169,'VE','google.com','2024-01-25 23:00:00.000',6,0,6,0,nan,0,0.7,'2024-01-10 23:00:00.000',nan,660,0,0,nan,2,0,0,0,662,0,0,42.9,662,0,0,-1,3.6650198,0,0.084454976,3.5),
(8346,'SN','www.tiktok.com','2024-01-31 15:00:00.000',2,0,2,0,nan,0,0.75,'2024-01-10 23:00:00.000',nan,660,0,0,nan,2,0,0,0,662,0,0,42.9,662,0,0,1,3.7752855,0,0.098714285,3.5),
(8767,'DE','preview.redd.it','2024-01-12 22:00:00.000',1,0,1,0,nan,0.75,0,'2024-01-10 23:00:00.000',nan,56,0,0,nan,26,0,0,0,82,0,0,0,82,0,0,-1,3.5091832,0,0.08009709,3.5),
(8767,'DE','twitter.com','2024-01-12 22:00:00.000',1,0,1,0,nan,0.75,0,'2024-01-10 23:00:00.000',nan,132,0,0,nan,37,0,0,0,169,0,0,0,169,0,0,1,3.7883966,0,0.03966346,3.5),
(8767,'DE','www.facebook.com','2024-01-12 22:00:00.000',1,0,1,0,nan,0.75,0,'2024-01-10 23:00:00.000',nan,141,0,0,nan,37,0,0,0,178,0,0,0,178,0,0,1,3.800651,0,0.037844036,3.5),
(8767,'DE','external.xx.fbcdn.net','2024-01-14 22:00:00.000',2,0,2,0,nan,0.75,0,'2024-01-09 01:00:00.000',nan,37,0,0,nan,13,0,0,0,50,0,0,0,50,0,0,1,3.5141737,0,0.13356164,3.5),
(12668,'RU','www.facebook.com','2024-01-20 19:00:00.000',0,1,1,nan,0,0,0.7,'2024-01-10 19:00:00.000',nan,0,0,0,0,184,0,0,0,184,0,0,48.3,184,0,0,1,3.500514,0,0.31609195,3.5),
(8048,'VE','amazon.com','2024-01-29 22:00:00.000',0,1,1,nan,0,0.75,0,'2024-01-10 20:00:00.000',nan,1,0,0,nan,213,0,0,19.5,214,0,0,0,214,0,0,1,3.647134,0,0.1875,3.5),
(8048,'VE','google.com','2024-01-23 04:00:00.000',0,1,1,nan,0,0,0.7,'2024-01-10 21:00:00.000',nan,16,0,0,nan,17,0,0,0,33,0,0,0,33,0,0,-1,3.6120336,0,0.19636363,3.5)
Loading