Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(slices): Make consumers "slice-aware" #3259

Merged
merged 41 commits into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
aac106c
Create topics class and registry
enochtangg Oct 7, 2022
8088d3d
Rename topic keys to mirror logical values
enochtangg Oct 11, 2022
36c14be
Move topics list to settings
enochtangg Oct 12, 2022
18340a0
Add slicing to configuration builder
Oct 13, 2022
56223ae
Add slicing to consumer builder
Oct 13, 2022
ce6e3e2
Add slicing logic to consumer cli
Oct 13, 2022
0870749
Add slice_id dependent commit log topic
Oct 13, 2022
7b49b9d
Add clarifying comments
Oct 14, 2022
ec2e7ba
Test changes/add comments
Oct 14, 2022
5e6f762
Attempt to rebase
Oct 20, 2022
91def84
Rename topic keys to mirror logical values
enochtangg Oct 11, 2022
0e149d8
More rebasing
Oct 20, 2022
9213a6d
Add cluster selection
Oct 14, 2022
4cbec25
Rebase
Oct 20, 2022
7e76304
fix typing
enochtangg Oct 14, 2022
bbc4fc6
Use cluster selection
Oct 14, 2022
1c3bf3d
fix typing
Oct 14, 2022
5fa3189
Remove mutable property of TableWriter
Oct 17, 2022
cbce8b5
Add slice id parameters where needed
Oct 17, 2022
0c414b5
Remove code assuming slice id to be a property of TableWriter
Oct 17, 2022
771ee37
Remove comments
Oct 17, 2022
ce12536
Rename different topics
Oct 18, 2022
1de82db
Remove old comment
Oct 18, 2022
9d020a4
Add slice id validation
Oct 18, 2022
b81609b
Tested sliced consumer behavior + get rid of snuba topic mapping
Oct 20, 2022
5e91265
Continuing to rebase
Oct 20, 2022
27e70cf
Remove extra newline
Oct 20, 2022
470a365
add assert to prevent override raw topic + slicing situation
Oct 25, 2022
5375282
Moving logical to physical topic mapping into KafkaTopicSpec
Oct 31, 2022
9510a50
Add docstring comment for validation
Oct 31, 2022
fd44db3
Add test file
Nov 3, 2022
ecf06df
Merge branch 'master' of github.com:getsentry/snuba into sliced-consu…
Nov 4, 2022
7ba8ce6
Consistent topic handling and resolving
Nov 4, 2022
8c8425d
Add test for get_physical_topic_name
Nov 7, 2022
b8c25f5
Move validation to partitioning file
Nov 7, 2022
e9d52f6
Fix broken test (get_physical_topic_name)
Nov 8, 2022
db7e972
Remove defaults
Nov 10, 2022
9f06bba
Fix mypy issue
Nov 10, 2022
2fe8679
Add test
Nov 10, 2022
dde5c5a
config(slicing) - add slice ID to emitted metrics tags (#3374)
onewland Nov 11, 2022
9003367
ArroyoTopic --> Topic
Nov 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion snuba/cli/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
help="The storage to target",
required=True,
)
@click.option(
"--slice-id",
"slice_id",
type=int,
help="The slice id for the storage",
)
@click.option(
"--max-batch-size",
default=settings.DEFAULT_MAX_BATCH_SIZE,
Expand Down Expand Up @@ -121,6 +127,7 @@ def consumer(
consumer_group: str,
bootstrap_server: Sequence[str],
storage_name: str,
slice_id: Optional[int],
max_batch_size: int,
max_batch_time_ms: int,
auto_offset_reset: str,
Expand Down Expand Up @@ -176,10 +183,11 @@ def stats_callback(stats_json: str) -> None:
profile_path=profile_path,
stats_callback=stats_callback,
parallel_collect=parallel_collect,
slice_id=slice_id,
cooperative_rebalancing=cooperative_rebalancing,
)

consumer = consumer_builder.build_base_consumer()
consumer = consumer_builder.build_base_consumer(slice_id)

def handler(signum: int, frame: Any) -> None:
consumer.signal_shutdown()
Expand Down
2 changes: 2 additions & 0 deletions snuba/consumers/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ def build_batch_writer(
metrics: MetricsBackend,
replacements_producer: Optional[ConfluentKafkaProducer] = None,
replacements_topic: Optional[Topic] = None,
slice_id: Optional[int] = None,
) -> Callable[[], ProcessedMessageBatchWriter]:

assert not (replacements_producer is None) ^ (replacements_topic is None)
Expand All @@ -342,6 +343,7 @@ def build_batch_writer(
writer = table_writer.get_batch_writer(
metrics,
{"load_balancing": "in_order", "insert_distributed_sync": 1},
slice_id=slice_id,
)

def build_writer() -> ProcessedMessageBatchWriter:
Expand Down
60 changes: 44 additions & 16 deletions snuba/consumers/consumer_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dataclasses import dataclass
from typing import Callable, Optional, Sequence

from arroyo import Topic
from arroyo import Topic as ArroyoTopic
from arroyo.backends.kafka import KafkaConsumer, KafkaPayload
from arroyo.commit import IMMEDIATE
from arroyo.processing import StreamProcessor
Expand All @@ -18,6 +18,7 @@
build_mock_batch_writer,
process_message,
)
from snuba.datasets.partitioning import validate_passed_slice
from snuba.datasets.storages.factory import get_writable_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.environment import setup_sentry
Expand Down Expand Up @@ -77,6 +78,7 @@ def __init__(
max_batch_time_ms: int,
metrics: MetricsBackend,
parallel_collect: bool,
slice_id: Optional[int],
stats_callback: Optional[Callable[[str], None]] = None,
commit_retry_policy: Optional[RetryPolicy] = None,
profile_path: Optional[str] = None,
Expand All @@ -93,12 +95,15 @@ def __init__(
.topic
)

validate_passed_slice(storage_key, slice_id)
ayirr7 marked this conversation as resolved.
Show resolved Hide resolved

self.broker_config = get_default_kafka_configuration(
topic, bootstrap_servers=kafka_params.bootstrap_servers
topic, slice_id, bootstrap_servers=kafka_params.bootstrap_servers
)
logger.info(f"librdkafka log level: {self.broker_config.get('log_level', 6)}")
self.producer_broker_config = build_kafka_producer_configuration(
topic,
slice_id,
bootstrap_servers=kafka_params.bootstrap_servers,
override_params={
"partitioner": "consistent",
Expand All @@ -108,29 +113,37 @@ def __init__(

stream_loader = self.storage.get_table_writer().get_stream_loader()

self.raw_topic: Topic
self.raw_topic: ArroyoTopic
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personal preference, I'd rather you just left all these names.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with leaving the names is the potential confusion between (Snuba) Topic and an (Arroyo) Topic. If both are Topic (how it was originally), it can become confusing to work with the code in future iterations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the issue is the other topic (SnubaTopic) is being imported here but that should never be the case. It should always be accessed via the stream loader and KafkaTopicSpec mechanism to ensure that topic resolving is done correctly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense. I no longer have those imports. Shall I change ArroyoTopic back to Topic?

Copy link
Member

@lynnagara lynnagara Nov 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That seems good to me. It feels a little bit more consistent with what the other files do.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bumping this. Can you please revert the change?

if kafka_params.raw_topic is not None:
self.raw_topic = Topic(kafka_params.raw_topic)
self.raw_topic = ArroyoTopic(kafka_params.raw_topic)
else:
self.raw_topic = Topic(stream_loader.get_default_topic_spec().topic_name)
default_topic_spec = stream_loader.get_default_topic_spec()
self.raw_topic = ArroyoTopic(
default_topic_spec.get_physical_topic_name(slice_id)
)

self.replacements_topic: Optional[Topic]
self.replacements_topic: Optional[ArroyoTopic]
if kafka_params.replacements_topic is not None:
self.replacements_topic = Topic(kafka_params.replacements_topic)
self.replacements_topic = ArroyoTopic(kafka_params.replacements_topic)
else:
replacement_topic_spec = stream_loader.get_replacement_topic_spec()
if replacement_topic_spec is not None:
self.replacements_topic = Topic(replacement_topic_spec.topic_name)
self.replacements_topic = ArroyoTopic(
replacement_topic_spec.get_physical_topic_name(slice_id)
)
else:
self.replacements_topic = None

self.commit_log_topic: Optional[Topic]
self.commit_log_topic: Optional[ArroyoTopic]
if kafka_params.commit_log_topic is not None:
self.commit_log_topic = Topic(kafka_params.commit_log_topic)
self.commit_log_topic = ArroyoTopic(kafka_params.commit_log_topic)

else:
commit_log_topic_spec = stream_loader.get_commit_log_topic_spec()
if commit_log_topic_spec is not None:
self.commit_log_topic = Topic(commit_log_topic_spec.topic_name)
self.commit_log_topic = ArroyoTopic(
commit_log_topic_spec.get_physical_topic_name(slice_id)
)
else:
self.commit_log_topic = None

Expand Down Expand Up @@ -172,15 +185,24 @@ def __init__(
self.__commit_retry_policy = commit_retry_policy

def __build_consumer(
self, strategy_factory: ProcessingStrategyFactory[KafkaPayload]
self,
strategy_factory: ProcessingStrategyFactory[KafkaPayload],
slice_id: Optional[int] = None,
) -> StreamProcessor[KafkaPayload]:
configuration = build_kafka_consumer_configuration(

# retrieves the default logical topic
topic = (
self.storage.get_table_writer()
.get_stream_loader()
.get_default_topic_spec()
.topic,
.topic
)

configuration = build_kafka_consumer_configuration(
topic,
bootstrap_servers=self.bootstrap_servers,
group_id=self.group_id,
slice_id=slice_id,
auto_offset_reset=self.auto_offset_reset,
strict_offset_reset=self.strict_offset_reset,
queued_max_messages_kbytes=self.queued_max_messages_kbytes,
Expand Down Expand Up @@ -220,6 +242,7 @@ def __build_consumer(

def __build_streaming_strategy_factory(
self,
slice_id: Optional[int] = None,
) -> ProcessingStrategyFactory[KafkaPayload]:
table_writer = self.storage.get_table_writer()
stream_loader = table_writer.get_stream_loader()
Expand All @@ -240,6 +263,7 @@ def __build_streaming_strategy_factory(
self.producer if self.replacements_topic is not None else None
),
replacements_topic=self.replacements_topic,
slice_id=slice_id,
)
if self.__mock_parameters is None
else build_mock_batch_writer(
Expand Down Expand Up @@ -267,8 +291,12 @@ def __build_streaming_strategy_factory(

return strategy_factory

def build_base_consumer(self) -> StreamProcessor[KafkaPayload]:
def build_base_consumer(
self, slice_id: Optional[int] = None
) -> StreamProcessor[KafkaPayload]:
"""
Builds the consumer.
"""
return self.__build_consumer(self.__build_streaming_strategy_factory())
return self.__build_consumer(
self.__build_streaming_strategy_factory(slice_id), slice_id
)
15 changes: 15 additions & 0 deletions snuba/datasets/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
should be stored. These do not require individual physical partitions but allow
for repartitioning with less code changes per physical change.
"""
from typing import Optional

from snuba.datasets.storages.storage_key import StorageKey

SENTRY_LOGICAL_PARTITIONS = 256
Expand Down Expand Up @@ -39,3 +41,16 @@ def is_storage_partitioned(storage: StorageKey) -> bool:
from snuba.settings import SLICED_STORAGES

return True if storage.value in SLICED_STORAGES.keys() else False


def validate_passed_slice(storage_key: StorageKey, slice_id: Optional[int]) -> None:
"""
Verifies that the given storage can be sliced
and that the slice_id passed in is within the range
of the total number of slices for the given storage
"""
from snuba.settings import SLICED_STORAGES

if slice_id is not None:
assert storage_key.value in SLICED_STORAGES
assert slice_id < SLICED_STORAGES[storage_key.value]
12 changes: 11 additions & 1 deletion snuba/datasets/table_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from snuba.datasets.schemas.tables import WritableTableSchema, WriteFormat
from snuba.processor import MessageProcessor
from snuba.replacers.replacer_processor import ReplacerProcessor
from snuba.settings import SLICED_KAFKA_TOPIC_MAP
from snuba.snapshots import BulkLoadSource
from snuba.snapshots.loaders import BulkLoader
from snuba.snapshots.loaders.single_table import RowProcessor, SingleTableBulkLoader
Expand Down Expand Up @@ -57,6 +58,14 @@ def topic_creation_config(self) -> Mapping[str, str]:
def __eq__(self, other: object) -> bool:
return isinstance(other, KafkaTopicSpec) and self.topic == other.topic

def get_physical_topic_name(self, slice_id: Optional[int] = None) -> str:
ayirr7 marked this conversation as resolved.
Show resolved Hide resolved
if slice_id is not None:
physical_topic = SLICED_KAFKA_TOPIC_MAP[(self.topic_name, slice_id)]
else:
physical_topic = self.topic_name

return physical_topic


def get_topic_name(topic: Topic) -> str:
return settings.KAFKA_TOPIC_MAP.get(topic.value, topic.value)
Expand Down Expand Up @@ -225,6 +234,7 @@ def get_batch_writer(
options: ClickhouseWriterOptions = None,
table_name: Optional[str] = None,
chunk_size: int = settings.CLICKHOUSE_HTTP_CHUNK_SIZE,
slice_id: Optional[int] = None,
ayirr7 marked this conversation as resolved.
Show resolved Hide resolved
) -> BatchWriter[JSONRow]:
table_name = table_name or self.__table_schema.get_table_name()
if self.__write_format == WriteFormat.JSON:
Expand All @@ -240,7 +250,7 @@ def get_batch_writer(
raise TypeError("unknown table format", self.__write_format)
options = self.__update_writer_options(options)

return get_cluster(self.__storage_set).get_batch_writer(
return get_cluster(self.__storage_set, slice_id).get_batch_writer(
metrics,
insert_statement,
encoding=None,
Expand Down
20 changes: 16 additions & 4 deletions snuba/utils/streams/configuration_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,28 @@
from snuba.utils.streams.types import KafkaBrokerConfig


def _get_default_topic_configuration(topic: Optional[Topic]) -> Mapping[str, Any]:
def _get_default_topic_configuration(
ayirr7 marked this conversation as resolved.
Show resolved Hide resolved
topic: Optional[Topic], slice_id: Optional[int] = None
) -> Mapping[str, Any]:

if topic is not None:
return settings.KAFKA_BROKER_CONFIG.get(topic.value, settings.BROKER_CONFIG)
if slice_id is not None:
return settings.SLICED_KAFKA_BROKER_CONFIG.get(
(topic.value, slice_id), settings.BROKER_CONFIG
ayirr7 marked this conversation as resolved.
Show resolved Hide resolved
)
else:
return settings.KAFKA_BROKER_CONFIG.get(topic.value, settings.BROKER_CONFIG)
else:
return settings.BROKER_CONFIG


def get_default_kafka_configuration(
topic: Optional[Topic] = None,
slice_id: Optional[int] = None,
bootstrap_servers: Optional[Sequence[str]] = None,
override_params: Optional[Mapping[str, Any]] = None,
) -> KafkaBrokerConfig:
default_topic_config = _get_default_topic_configuration(topic)
default_topic_config = _get_default_topic_configuration(topic, slice_id)

return build_kafka_configuration(
default_topic_config, bootstrap_servers, override_params
Expand All @@ -32,14 +41,15 @@ def get_default_kafka_configuration(
def build_kafka_consumer_configuration(
topic: Optional[Topic],
group_id: str,
slice_id: Optional[int] = None,
auto_offset_reset: Optional[str] = None,
queued_max_messages_kbytes: Optional[int] = None,
queued_min_messages: Optional[int] = None,
bootstrap_servers: Optional[Sequence[str]] = None,
override_params: Optional[Mapping[str, Any]] = None,
strict_offset_reset: Optional[bool] = None,
) -> KafkaBrokerConfig:
default_topic_config = _get_default_topic_configuration(topic)
default_topic_config = _get_default_topic_configuration(topic, slice_id)
ayirr7 marked this conversation as resolved.
Show resolved Hide resolved

return _build_kafka_consumer_configuration(
default_topic_config,
Expand All @@ -55,11 +65,13 @@ def build_kafka_consumer_configuration(

def build_kafka_producer_configuration(
topic: Optional[Topic],
slice_id: Optional[int] = None,
bootstrap_servers: Optional[Sequence[str]] = None,
override_params: Optional[Mapping[str, Any]] = None,
) -> KafkaBrokerConfig:
broker_config = get_default_kafka_configuration(
topic=topic,
slice_id=slice_id,
bootstrap_servers=bootstrap_servers,
override_params=override_params,
)
Expand Down
21 changes: 21 additions & 0 deletions tests/datasets/test_table_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from snuba.datasets.storages.factory import get_writable_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.settings import SLICED_KAFKA_TOPIC_MAP


def test_get_physical_topic_name(monkeypatch) -> None: # type: ignore

monkeypatch.setitem(
SLICED_KAFKA_TOPIC_MAP, ("ingest-replay-events", 2), "ingest-replay-events-2"
)

storage_key = StorageKey.REPLAYS
storage = get_writable_storage(storage_key)

stream_loader = storage.get_table_writer().get_stream_loader()

default_topic_spec = stream_loader.get_default_topic_spec()

physical_topic_name = default_topic_spec.get_physical_topic_name(slice_id=2)

assert physical_topic_name == "ingest-replay-events-2"