Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(MDC): Topics Enum to Class (with fix) #3268

Closed
wants to merge 9 commits into from
2 changes: 1 addition & 1 deletion snuba/attribution/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def record_attribution(attr_data: AttributionData) -> None:
producer.poll(0) # trigger queued delivery callbacks
producer.produce(
settings.KAFKA_TOPIC_MAP.get(
Topic.ATTRIBUTION.value, Topic.ATTRIBUTION.value
Topic.SNUBA_ATTRIBUTION.value, Topic.SNUBA_ATTRIBUTION.value
),
data_str.encode("utf-8"),
on_delivery=_record_attribution_delivery_callback,
Expand Down
6 changes: 3 additions & 3 deletions snuba/datasets/configuration/storage_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
build_kafka_stream_loader_from_settings,
)
from snuba.subscriptions.utils import SchedulingWatermarkMode
from snuba.utils.streams.topics import Topic
from snuba.utils.streams.topics import Topic, register_topic

KIND = "kind"
WRITABLE_STORAGE = "writable_storage"
Expand Down Expand Up @@ -91,7 +91,7 @@ def build_stream_loader(loader_config: dict[str, Any]) -> KafkaStreamLoader:
processor = DatasetMessageProcessor.get_from_name(
loader_config["processor"]
).from_kwargs()
default_topic = Topic(loader_config["default_topic"])
default_topic = register_topic(loader_config["default_topic"])
# optionals
pre_filter = None
if PRE_FILTER in loader_config and loader_config[PRE_FILTER] is not None:
Expand Down Expand Up @@ -131,7 +131,7 @@ def build_stream_loader(loader_config: dict[str, Any]) -> KafkaStreamLoader:

def __get_topic(stream_loader_config: dict[str, Any], name: str | None) -> Topic | None:
return (
Topic(stream_loader_config[name])
register_topic(stream_loader_config[name])
if name in stream_loader_config and stream_loader_config[name] is not None
else None
)
Empty file.
28 changes: 28 additions & 0 deletions snuba/datasets/configuration/validation/post_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import Set

from snuba import settings
from snuba.utils.streams.topics import Topic


class InvalidTopicError(ValueError):
pass


def validate_storages() -> None:
validate_topics_with_settings()


def validate_topics_with_settings() -> None:
"""
This function validates topics specified in settings are valid
topics that have been loaded into the Topics registry (via storage builder).
"""
topic_names: Set[str] = set([t.value for t in Topic])

for key in settings.KAFKA_TOPIC_MAP.keys():
if key not in topic_names:
raise InvalidTopicError(f"Invalid topic value: {key}")

for key in settings.KAFKA_BROKER_CONFIG.keys():
if key not in topic_names:
raise ValueError(f"Invalid topic value {key}")
6 changes: 3 additions & 3 deletions snuba/datasets/storages/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
processor=ErrorsProcessor(promoted_tag_columns),
default_topic=Topic.EVENTS,
replacement_topic=Topic.EVENT_REPLACEMENTS,
commit_log_topic=Topic.COMMIT_LOG,
commit_log_topic=Topic.SNUBA_COMMIT_LOG,
subscription_scheduler_mode=SchedulingWatermarkMode.PARTITION,
subscription_scheduled_topic=Topic.SUBSCRIPTION_SCHEDULED_EVENTS,
subscription_result_topic=Topic.SUBSCRIPTION_RESULTS_EVENTS,
subscription_scheduled_topic=Topic.SCHEDULED_SUBSCRIPTIONS_EVENTS,
subscription_result_topic=Topic.EVENTS_SUBSCRIPTION_RESULTS,
),
# This is the default, just showing where it goes for the PR
write_format=WriteFormat.JSON,
Expand Down
6 changes: 3 additions & 3 deletions snuba/datasets/storages/errors_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@
processor=ErrorsProcessor(promoted_tag_columns),
default_topic=Topic.EVENTS,
replacement_topic=Topic.EVENT_REPLACEMENTS,
commit_log_topic=Topic.COMMIT_LOG,
commit_log_topic=Topic.SNUBA_COMMIT_LOG,
subscription_scheduler_mode=SchedulingWatermarkMode.PARTITION,
subscription_scheduled_topic=Topic.SUBSCRIPTION_SCHEDULED_EVENTS,
subscription_result_topic=Topic.SUBSCRIPTION_RESULTS_EVENTS,
subscription_scheduled_topic=Topic.SCHEDULED_SUBSCRIPTIONS_EVENTS,
subscription_result_topic=Topic.EVENTS_SUBSCRIPTION_RESULTS,
),
replacer_processor=ErrorsReplacer(
schema=schema,
Expand Down
2 changes: 2 additions & 0 deletions snuba/datasets/storages/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from snuba import settings
from snuba.datasets.cdc import CdcStorage
from snuba.datasets.configuration.storage_builder import build_storage
from snuba.datasets.configuration.validation.post_loader import validate_storages
from snuba.datasets.storage import ReadableTableStorage, Storage, WritableTableStorage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.state import get_config
Expand All @@ -28,6 +29,7 @@ def __init__(self) -> None:
self._dev_non_writable_storages: dict[StorageKey, Storage] = {}
self._all_storages: dict[StorageKey, Storage] = {}
self.__initialize()
validate_storages()

def __initialize(self) -> None:

Expand Down
2 changes: 1 addition & 1 deletion snuba/datasets/storages/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
query_processors=[],
stream_loader=build_kafka_stream_loader_from_settings(
processor=FunctionsMessageProcessor(),
default_topic=Topic.PROFILES_FUNCTIONS,
default_topic=Topic.PROFILES_CALL_TREE,
),
)

Expand Down
20 changes: 10 additions & 10 deletions snuba/datasets/storages/generic_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ def produce_policy_creator() -> DeadLetterQueuePolicy:
"""
return ProduceInvalidMessagePolicy(
KafkaProducer(
build_kafka_producer_configuration(Topic.DEAD_LETTER_GENERIC_METRICS)
build_kafka_producer_configuration(Topic.SNUBA_DEAD_LETTER_GENERIC_METRICS)
),
KafkaTopic(Topic.DEAD_LETTER_GENERIC_METRICS.value),
KafkaTopic(Topic.SNUBA_DEAD_LETTER_GENERIC_METRICS.value),
)


Expand Down Expand Up @@ -137,12 +137,12 @@ def produce_policy_creator() -> DeadLetterQueuePolicy:
query_processors=[],
stream_loader=build_kafka_stream_loader_from_settings(
processor=GenericSetsMetricsProcessor(),
default_topic=Topic.GENERIC_METRICS,
default_topic=Topic.SNUBA_GENERIC_METRICS,
dead_letter_queue_policy_creator=produce_policy_creator,
commit_log_topic=Topic.GENERIC_METRICS_SETS_COMMIT_LOG,
subscription_scheduled_topic=Topic.SUBSCRIPTION_SCHEDULED_GENERIC_METRICS_SETS,
commit_log_topic=Topic.SNUBA_GENERIC_METRICS_SETS_COMMIT_LOG,
subscription_scheduled_topic=Topic.SCHEDULED_SUBSCRIPTIONS_GENERIC_METRICS_SETS,
subscription_scheduler_mode=SchedulingWatermarkMode.GLOBAL,
subscription_result_topic=Topic.SUBSCRIPTION_RESULTS_GENERIC_METRICS_SETS,
subscription_result_topic=Topic.GENERIC_METRICS_SETS_SUBSCRIPTION_RESULTS,
pre_filter=KafkaHeaderSelectFilter("metric_type", InputType.SET.value),
),
)
Expand Down Expand Up @@ -177,12 +177,12 @@ def produce_policy_creator() -> DeadLetterQueuePolicy:
query_processors=[],
stream_loader=build_kafka_stream_loader_from_settings(
processor=GenericDistributionsMetricsProcessor(),
default_topic=Topic.GENERIC_METRICS,
default_topic=Topic.SNUBA_GENERIC_METRICS,
dead_letter_queue_policy_creator=produce_policy_creator,
commit_log_topic=Topic.GENERIC_METRICS_DISTRIBUTIONS_COMMIT_LOG,
subscription_scheduled_topic=Topic.SUBSCRIPTION_SCHEDULED_GENERIC_METRICS_DISTRIBUTIONS,
commit_log_topic=Topic.SNUBA_GENERIC_METRICS_DISTRIBUTIONS_COMMIT_LOG,
subscription_scheduled_topic=Topic.SCHEDULED_SUBSCRIPTIONS_GENERIC_METRICS_DISTRIBUTIONS,
subscription_scheduler_mode=SchedulingWatermarkMode.GLOBAL,
subscription_result_topic=Topic.SUBSCRIPTION_RESULTS_GENERIC_METRICS_DISTRIBUTIONS,
subscription_result_topic=Topic.GENERIC_METRICS_DISTRIBUTIONS_SUBSCRIPTION_RESULTS,
pre_filter=KafkaHeaderSelectFilter("metric_type", InputType.DISTRIBUTION.value),
),
)
20 changes: 11 additions & 9 deletions snuba/datasets/storages/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ def produce_policy_creator() -> DeadLetterQueuePolicy:
Produce all bad messages to dead-letter topic.
"""
return ProduceInvalidMessagePolicy(
KafkaProducer(build_kafka_producer_configuration(Topic.DEAD_LETTER_METRICS)),
KafkaTopic(Topic.DEAD_LETTER_METRICS.value),
KafkaProducer(
build_kafka_producer_configuration(Topic.SNUBA_DEAD_LETTER_METRICS)
),
KafkaTopic(Topic.SNUBA_DEAD_LETTER_METRICS.value),
)


Expand All @@ -90,11 +92,11 @@ def produce_policy_creator() -> DeadLetterQueuePolicy:
query_processors=[],
stream_loader=build_kafka_stream_loader_from_settings(
processor=PolymorphicMetricsProcessor(),
default_topic=Topic.METRICS,
commit_log_topic=Topic.METRICS_COMMIT_LOG,
default_topic=Topic.SNUBA_METRICS,
commit_log_topic=Topic.SNUBA_METRICS_COMMIT_LOG,
subscription_scheduler_mode=SchedulingWatermarkMode.GLOBAL,
subscription_scheduled_topic=Topic.SUBSCRIPTION_SCHEDULED_METRICS,
subscription_result_topic=Topic.SUBSCRIPTION_RESULTS_METRICS,
subscription_scheduled_topic=Topic.SCHEDULED_SUBSCRIPTIONS_METRICS,
subscription_result_topic=Topic.METRICS_SUBSCRIPTION_RESULTS,
dead_letter_queue_policy_creator=produce_policy_creator,
),
)
Expand Down Expand Up @@ -128,7 +130,7 @@ def produce_policy_creator() -> DeadLetterQueuePolicy:
query_processors=[ArrayJoinKeyValueOptimizer("tags"), TableRateLimit()],
stream_loader=build_kafka_stream_loader_from_settings(
SetsAggregateProcessor(),
default_topic=Topic.METRICS,
default_topic=Topic.SNUBA_METRICS,
dead_letter_queue_policy_creator=produce_policy_creator,
),
write_format=WriteFormat.VALUES,
Expand All @@ -151,7 +153,7 @@ def produce_policy_creator() -> DeadLetterQueuePolicy:
query_processors=[ArrayJoinKeyValueOptimizer("tags"), TableRateLimit()],
stream_loader=build_kafka_stream_loader_from_settings(
CounterAggregateProcessor(),
default_topic=Topic.METRICS,
default_topic=Topic.SNUBA_METRICS,
dead_letter_queue_policy_creator=produce_policy_creator,
),
write_format=WriteFormat.VALUES,
Expand Down Expand Up @@ -205,7 +207,7 @@ def produce_policy_creator() -> DeadLetterQueuePolicy:
query_processors=[ArrayJoinKeyValueOptimizer("tags"), TableRateLimit()],
stream_loader=build_kafka_stream_loader_from_settings(
DistributionsAggregateProcessor(),
default_topic=Topic.METRICS,
default_topic=Topic.SNUBA_METRICS,
dead_letter_queue_policy_creator=produce_policy_creator,
),
write_format=WriteFormat.VALUES,
Expand Down
2 changes: 1 addition & 1 deletion snuba/datasets/storages/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

loader = build_kafka_stream_loader_from_settings(
processor=ProfilesMessageProcessor(),
default_topic=Topic.PROFILES,
default_topic=Topic.PROCESSED_PROFILES,
)

readable_columns = ColumnSet(
Expand Down
2 changes: 1 addition & 1 deletion snuba/datasets/storages/querylog.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@
query_processors=[],
stream_loader=build_kafka_stream_loader_from_settings(
processor=QuerylogProcessor(),
default_topic=Topic.QUERYLOG,
default_topic=Topic.SNUBA_QUERIES,
),
)
8 changes: 5 additions & 3 deletions snuba/datasets/storages/replays.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,10 @@
def produce_policy_creator() -> DeadLetterQueuePolicy:
"""Produce all bad messages to dead-letter topic."""
return ProduceInvalidMessagePolicy(
KafkaProducer(build_kafka_producer_configuration(Topic.DEAD_LETTER_REPLAYS)),
KafkaTopic(Topic.DEAD_LETTER_REPLAYS.value),
KafkaProducer(
build_kafka_producer_configuration(Topic.SNUBA_DEAD_LETTER_REPLAYS)
),
KafkaTopic(Topic.SNUBA_DEAD_LETTER_REPLAYS.value),
)


Expand All @@ -102,7 +104,7 @@ def produce_policy_creator() -> DeadLetterQueuePolicy:
mandatory_condition_checkers=[ProjectIdEnforcer()],
stream_loader=build_kafka_stream_loader_from_settings(
processor=ReplaysProcessor(),
default_topic=Topic.REPLAYEVENTS,
default_topic=Topic.INGEST_REPLAY_EVENTS,
dead_letter_queue_policy_creator=produce_policy_creator,
),
)
2 changes: 1 addition & 1 deletion snuba/datasets/storages/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def process_query(self, query: Query, query_settings: QuerySettings) -> None:


kafka_stream_loader = build_kafka_stream_loader_from_settings(
processor=SessionsProcessor(), default_topic=Topic.SESSIONS
processor=SessionsProcessor(), default_topic=Topic.INGEST_SESSIONS
)

raw_storage = WritableTableStorage(
Expand Down
6 changes: 3 additions & 3 deletions snuba/datasets/storages/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
stream_loader=build_kafka_stream_loader_from_settings(
processor=TransactionsMessageProcessor(),
default_topic=Topic.TRANSACTIONS,
commit_log_topic=Topic.TRANSACTIONS_COMMIT_LOG,
commit_log_topic=Topic.SNUBA_TRANSACTIONS_COMMIT_LOG,
subscription_scheduler_mode=SchedulingWatermarkMode.GLOBAL,
subscription_scheduled_topic=Topic.SUBSCRIPTION_SCHEDULED_TRANSACTIONS,
subscription_result_topic=Topic.SUBSCRIPTION_RESULTS_TRANSACTIONS,
subscription_scheduled_topic=Topic.SCHEDULED_SUBSCRIPTIONS_TRANSACTIONS,
subscription_result_topic=Topic.TRANSACTIONS_SUBSCRIPTION_RESULTS,
),
query_splitters=query_splitters,
mandatory_condition_checkers=mandatory_condition_checkers,
Expand Down
6 changes: 3 additions & 3 deletions snuba/datasets/storages/transactions_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
stream_loader=build_kafka_stream_loader_from_settings(
processor=TransactionsMessageProcessor(),
default_topic=Topic.TRANSACTIONS,
commit_log_topic=Topic.TRANSACTIONS_COMMIT_LOG,
commit_log_topic=Topic.SNUBA_TRANSACTIONS_COMMIT_LOG,
subscription_scheduler_mode=SchedulingWatermarkMode.GLOBAL,
subscription_scheduled_topic=Topic.SUBSCRIPTION_SCHEDULED_TRANSACTIONS,
subscription_result_topic=Topic.SUBSCRIPTION_RESULTS_TRANSACTIONS,
subscription_scheduled_topic=Topic.SCHEDULED_SUBSCRIPTIONS_TRANSACTIONS,
subscription_result_topic=Topic.TRANSACTIONS_SUBSCRIPTION_RESULTS,
),
query_splitters=query_splitters,
mandatory_condition_checkers=mandatory_condition_checkers,
Expand Down
48 changes: 0 additions & 48 deletions snuba/settings/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,54 +28,6 @@ def validate_settings(locals: Mapping[str, Any]) -> None:
"DEFAULT_STORAGE_BROKERS is deprecated. Use KAFKA_BROKER_CONFIG instead."
)

topic_names = {
"events",
"event-replacements",
"transactions",
"snuba-commit-log",
"snuba-transactions-commit-log",
"snuba-sessions-commit-log",
"snuba-metrics-commit-log",
"cdc",
"snuba-metrics",
"outcomes",
"ingest-sessions",
"snuba-queries",
"scheduled-subscriptions-events",
"scheduled-subscriptions-transactions",
"scheduled-subscriptions-sessions",
"scheduled-subscriptions-metrics",
"scheduled-subscriptions-generic-metrics-sets",
"scheduled-subscriptions-generic-metrics-distributions",
"events-subscription-results",
"transactions-subscription-results",
"sessions-subscription-results",
"metrics-subscription-results",
"generic-metrics-sets-subscription-results",
"generic-metrics-distributions-subscription-results",
"snuba-dead-letter-inserts",
"processed-profiles",
"snuba-attribution",
"profiles-call-tree",
"ingest-replay-events",
"snuba-replay-events",
"snuba-dead-letter-replays",
"snuba-generic-metrics",
"snuba-generic-metrics-sets-commit-log",
"snuba-generic-metrics-distributions-commit-log",
"snuba-dead-letter-generic-metrics",
"snuba-dead-letter-sessions",
"snuba-dead-letter-metrics",
}

for key in locals["KAFKA_TOPIC_MAP"].keys():
if key not in topic_names:
raise InvalidTopicError(f"Invalid topic value: {key}")

for key in locals["KAFKA_BROKER_CONFIG"].keys():
if key not in topic_names:
raise ValueError(f"Invalid topic value {key}")

# Validate cluster configuration
from snuba.clusters.storage_sets import StorageSetKey

Expand Down
4 changes: 3 additions & 1 deletion snuba/state/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,9 @@ def record_query(query_metadata: Mapping[str, Any]) -> None:
).execute()
producer.poll(0) # trigger queued delivery callbacks
producer.produce(
settings.KAFKA_TOPIC_MAP.get(Topic.QUERYLOG.value, Topic.QUERYLOG.value),
settings.KAFKA_TOPIC_MAP.get(
Topic.SNUBA_QUERIES.value, Topic.SNUBA_QUERIES.value
),
data.encode("utf-8"),
on_delivery=_record_query_delivery_callback,
)
Expand Down
Loading