Skip to content

Commit

Permalink
ref(subscriptions): Decouple subscription logic from datasets [INGEST…
Browse files Browse the repository at this point in the history
…-627] (#2244)

* ref(subscriptions): Decouple subscription logic from datasets

Decouples subscription creation and deletion logic
from datasets to also consider entities since
dataset to entity is not a 1-to-1 relation

* Add test that ensures selected dataset is actually being used
  • Loading branch information
ahmedetefy committed Dec 9, 2021
1 parent efab254 commit 7fb5a25
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 67 deletions.
27 changes: 20 additions & 7 deletions snuba/cli/subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@
from arroyo.synchronized import SynchronizedConsumer

from snuba import environment, settings
from snuba.datasets.entities.factory import ENTITY_NAME_LOOKUP
from snuba.datasets.factory import DATASET_NAMES, enforce_table_writer, get_dataset
from snuba.datasets.entities import EntityKey
from snuba.datasets.entities.factory import (
ENTITY_NAME_LOOKUP,
enforce_table_writer,
get_entity,
)
from snuba.datasets.factory import DATASET_NAMES, get_dataset
from snuba.environment import setup_logging, setup_sentry
from snuba.redis import redis_client
from snuba.subscriptions.codecs import SubscriptionTaskResultEncoder
Expand Down Expand Up @@ -43,6 +48,9 @@
type=click.Choice(DATASET_NAMES),
help="The dataset to target",
)
@click.option(
"--entity", "entity_name", help="The entity to target",
)
@click.option("--topic")
@click.option("--partitions", type=int)
@click.option("--commit-log-topic")
Expand Down Expand Up @@ -104,6 +112,7 @@
def subscriptions(
*,
dataset_name: str,
entity_name: Optional[str],
topic: Optional[str],
partitions: Optional[int],
commit_log_topic: Optional[str],
Expand All @@ -129,15 +138,19 @@ def subscriptions(

dataset = get_dataset(dataset_name)

entity = dataset.get_default_entity()
entity_key = ENTITY_NAME_LOOKUP[entity]
if not entity_name:
entity = dataset.get_default_entity()
entity_key = ENTITY_NAME_LOOKUP[entity]
else:
entity_key = EntityKey(entity_name)
entity = get_entity(entity_key)

storage = dataset.get_default_entity().get_writable_storage()
storage = entity.get_writable_storage()
assert (
storage is not None
), f"Dataset {dataset_name} does not have a writable storage by default."
), f"Entity {entity_key} does not have a writable storage by default."

loader = enforce_table_writer(dataset).get_stream_loader()
loader = enforce_table_writer(entity).get_stream_loader()
commit_log_topic_spec = loader.get_commit_log_topic_spec()
assert commit_log_topic_spec is not None

Expand Down
10 changes: 10 additions & 0 deletions snuba/datasets/entities/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from snuba import settings
from snuba.datasets.entities import EntityKey
from snuba.datasets.entity import Entity
from snuba.datasets.table_storage import TableWriter
from snuba.utils.serializable_exception import SerializableException


Expand Down Expand Up @@ -65,3 +66,12 @@ def get_entity(name: EntityKey) -> Entity:
raise InvalidEntityError(f"entity {name!r} does not exist") from error

return entity


def enforce_table_writer(entity: Entity) -> TableWriter:
writable_storage = entity.get_writable_storage()

assert (
writable_storage is not None
), f"Entity {ENTITY_NAME_LOOKUP[entity]} does not have a writable storage."
return writable_storage.get_table_writer()
12 changes: 0 additions & 12 deletions snuba/datasets/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from snuba import settings
from snuba.datasets.dataset import Dataset
from snuba.datasets.table_storage import TableWriter
from snuba.util import with_span
from snuba.utils.serializable_exception import SerializableException

Expand Down Expand Up @@ -80,14 +79,3 @@ def get_dataset_name(dataset: Dataset) -> str:

def get_enabled_dataset_names() -> Sequence[str]:
return [name for name in DATASET_NAMES if name not in settings.DISABLED_DATASETS]


# TODO: This should be removed and moved to the Entity since Datasets no longer control
# storages.
def enforce_table_writer(dataset: Dataset) -> TableWriter:
writable_storage = dataset.get_default_entity().get_writable_storage()

assert (
writable_storage is not None
), f"Dataset{dataset} does not have a writable storage."
return writable_storage.get_table_writer()
16 changes: 8 additions & 8 deletions snuba/subscriptions/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from uuid import UUID, uuid1

from snuba.datasets.dataset import Dataset
from snuba.datasets.entities.factory import ENTITY_NAME_LOOKUP
from snuba.datasets.factory import enforce_table_writer
from snuba.datasets.entities import EntityKey
from snuba.datasets.entities.factory import enforce_table_writer, get_entity
from snuba.redis import redis_client
from snuba.subscriptions.data import (
PartitionId,
Expand All @@ -22,12 +22,13 @@ class SubscriptionCreator:
the resulting query is valid.
"""

def __init__(self, dataset: Dataset):
def __init__(self, dataset: Dataset, entity_key: EntityKey):
self.dataset = dataset
self.entity_key = ENTITY_NAME_LOOKUP[dataset.get_default_entity()]
self.entity_key = entity_key

entity = get_entity(entity_key)
self.__partitioner = TopicSubscriptionDataPartitioner(
enforce_table_writer(dataset).get_stream_loader().get_default_topic_spec()
enforce_table_writer(entity).get_stream_loader().get_default_topic_spec()
)

def create(self, data: SubscriptionData, timer: Timer) -> SubscriptionIdentifier:
Expand All @@ -53,9 +54,8 @@ class SubscriptionDeleter:
Handles deletion of a `Subscription`, based on its ID and partition.
"""

def __init__(self, dataset: Dataset, partition: PartitionId):
self.dataset = dataset
self.entity_key = ENTITY_NAME_LOOKUP[dataset.get_default_entity()]
def __init__(self, entity_key: EntityKey, partition: PartitionId):
self.entity_key = entity_key
self.partition = partition

def delete(self, subscription_id: UUID) -> None:
Expand Down
29 changes: 23 additions & 6 deletions snuba/web/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Mapping,
MutableMapping,
MutableSequence,
Optional,
Sequence,
Text,
Tuple,
Expand All @@ -35,6 +36,7 @@
from snuba.consumers.types import KafkaMessageMetadata
from snuba.datasets.dataset import Dataset
from snuba.datasets.entities.factory import ENTITY_NAME_LOOKUP
from snuba.datasets.entity import Entity
from snuba.datasets.factory import (
InvalidDatasetError,
get_dataset,
Expand All @@ -57,7 +59,7 @@
from snuba.utils.metrics.timer import Timer
from snuba.utils.metrics.wrapper import MetricsWrapper
from snuba.web import QueryException
from snuba.web.converters import DatasetConverter
from snuba.web.converters import DatasetConverter, EntityConverter
from snuba.web.query import parse_and_run_query
from snuba.writer import BatchWriterEncoderWrapper, WriterTableRow

Expand Down Expand Up @@ -138,6 +140,7 @@ def truncate_dataset(dataset: Dataset) -> None:
application.testing = settings.TESTING
application.debug = settings.DEBUG
application.url_map.converters["dataset"] = DatasetConverter
application.url_map.converters["entity"] = EntityConverter


@application.errorhandler(InvalidJsonRequestException)
Expand Down Expand Up @@ -435,25 +438,39 @@ def handle_subscription_error(exception: InvalidSubscriptionError) -> Response:
)


@application.route("/<dataset:dataset>/<entity:entity>/subscriptions", methods=["POST"])
@application.route("/<dataset:dataset>/subscriptions", methods=["POST"])
@util.time_request("subscription")
def create_subscription(*, dataset: Dataset, timer: Timer) -> RespTuple:
entity_key = ENTITY_NAME_LOOKUP[dataset.get_default_entity()]
def create_subscription(
*, dataset: Dataset, timer: Timer, entity: Optional[Entity] = None
) -> RespTuple:
if not entity:
entity = dataset.get_default_entity()
entity_key = ENTITY_NAME_LOOKUP[entity]

subscription = SubscriptionDataCodec(entity_key).decode(http_request.data)
identifier = SubscriptionCreator(dataset).create(subscription, timer)
identifier = SubscriptionCreator(dataset, entity_key).create(subscription, timer)
return (
json.dumps({"subscription_id": str(identifier)}),
202,
{"Content-Type": "application/json"},
)


@application.route(
"/<dataset:dataset>/<entity:entity>/subscriptions/<int:partition>/<key>",
methods=["DELETE"],
)
@application.route(
"/<dataset:dataset>/subscriptions/<int:partition>/<key>", methods=["DELETE"]
)
def delete_subscription(*, dataset: Dataset, partition: int, key: str) -> RespTuple:
SubscriptionDeleter(dataset, PartitionId(partition)).delete(UUID(key))
def delete_subscription(
*, dataset: Dataset, partition: int, key: str, entity: Optional[Entity] = None
) -> RespTuple:
if not entity:
entity = dataset.get_default_entity()
entity_key = ENTITY_NAME_LOOKUP[entity]
SubscriptionDeleter(entity_key, PartitionId(partition)).delete(UUID(key))
return "ok", 202, {"Content-Type": "text/plain"}


Expand Down
34 changes: 12 additions & 22 deletions tests/subscriptions/test_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def subscription_rollout(self) -> Generator[None, None, None]:

@pytest.mark.parametrize("subscription", TESTS_CREATE)
def test(self, subscription: SubscriptionData) -> None:
creator = SubscriptionCreator(self.dataset)
creator = SubscriptionCreator(self.dataset, EntityKey.EVENTS)
identifier = creator.create(subscription, self.timer)
assert (
cast(
Expand All @@ -104,14 +104,14 @@ def test(self, subscription: SubscriptionData) -> None:

@pytest.mark.parametrize("subscription", TESTS_INVALID)
def test_invalid_condition_column(self, subscription: SubscriptionData) -> None:
creator = SubscriptionCreator(self.dataset)
creator = SubscriptionCreator(self.dataset, EntityKey.EVENTS)
with raises(QueryException):
creator.create(
subscription, self.timer,
)

def test_invalid_aggregation(self) -> None:
creator = SubscriptionCreator(self.dataset)
creator = SubscriptionCreator(self.dataset, EntityKey.EVENTS)
with raises(QueryException):
creator.create(
SnQLSubscriptionData(
Expand All @@ -125,7 +125,7 @@ def test_invalid_aggregation(self) -> None:
)

def test_invalid_time_window(self) -> None:
creator = SubscriptionCreator(self.dataset)
creator = SubscriptionCreator(self.dataset, EntityKey.EVENTS)
with raises(InvalidSubscriptionError):
creator.create(
SnQLSubscriptionData(
Expand Down Expand Up @@ -168,7 +168,7 @@ def test_invalid_time_window(self) -> None:
)

def test_invalid_resolution(self) -> None:
creator = SubscriptionCreator(self.dataset)
creator = SubscriptionCreator(self.dataset, EntityKey.EVENTS)
with raises(InvalidSubscriptionError):
creator.create(
SnQLSubscriptionData(
Expand All @@ -188,7 +188,7 @@ class TestSessionsSubscriptionCreator:
@pytest.mark.parametrize("subscription", TESTS_CREATE_SESSIONS)
def test(self, subscription: SubscriptionData) -> None:
dataset = get_dataset("sessions")
creator = SubscriptionCreator(dataset)
creator = SubscriptionCreator(dataset, EntityKey.SESSIONS)
identifier = creator.create(subscription, self.timer)
assert (
cast(
Expand Down Expand Up @@ -255,19 +255,7 @@ class TestMetricsCountersSubscriptionCreator:

@pytest.mark.parametrize("subscription", TESTS_CREATE_METRICS)
def test(self, subscription: SubscriptionData) -> None:
creator = SubscriptionCreator(self.dataset)
# XXX (ahmed): hack to circumvent using the default entity of a dataset as the default
# entity for the metrics dataset is METRICS_SETS, and this subscription type is currently
# not supported. Will add a fix shortly that relies on passing the entity key rather
# than relying on fetching the default entity from a dataset
creator.entity_key = EntityKey.METRICS_COUNTERS
writable_storage = get_entity(creator.entity_key).get_writable_storage()
assert writable_storage is not None
creator.__partitioner = TopicSubscriptionDataPartitioner(
writable_storage.get_table_writer()
.get_stream_loader()
.get_default_topic_spec()
)
creator = SubscriptionCreator(self.dataset, EntityKey.METRICS_COUNTERS)
identifier = creator.create(subscription, self.timer)
assert (
cast(
Expand All @@ -283,7 +271,7 @@ def test(self, subscription: SubscriptionData) -> None:
def test_missing_conditions_for_groupby_clause(
self, subscription: SubscriptionData
) -> None:
creator = SubscriptionCreator(self.dataset)
creator = SubscriptionCreator(self.dataset, EntityKey.METRICS_COUNTERS)
with raises(InvalidQueryException):
creator.create(
subscription, self.timer,
Expand All @@ -292,7 +280,7 @@ def test_missing_conditions_for_groupby_clause(

class TestSubscriptionDeleter(BaseSubscriptionTest):
def test(self) -> None:
creator = SubscriptionCreator(self.dataset)
creator = SubscriptionCreator(self.dataset, EntityKey.EVENTS)
subscription = SnQLSubscriptionData(
project_id=1,
query="MATCH (events) SELECT count() AS count",
Expand All @@ -311,7 +299,9 @@ def test(self) -> None:
== subscription
)

SubscriptionDeleter(self.dataset, identifier.partition).delete(identifier.uuid)
SubscriptionDeleter(self.entity_key, identifier.partition).delete(
identifier.uuid
)
assert (
RedisSubscriptionDataStore(
redis_client, self.entity_key, identifier.partition,
Expand Down
Loading

0 comments on commit 7fb5a25

Please sign in to comment.