Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rfc(decision): Batch multiple files together into single large file to improve network throughput #98

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ This repository contains RFCs and DACIs. Lost?
- [0086-sentry-bundler-plugins-api](text/0086-sentry-bundler-plugins-api.md): Sentry Bundler Plugins API
- [0088-fix-memory-limitiations-in-session-replays-access-pattern](text/0088-fix-memory-limitiations-in-session-replays-access-pattern.md): Fix Memory Limitiations in Session Replay's Access Pattern
- [0092-replay-issue-creation](text/0092-replay-issue-creation.md): Replay Issue Creation
- [0098-store-multiple-replay-segments-in-a-single-blob](text/0098-store-multiple-replay-segments-in-a-single-blob.md): Store Multiple Replay Segments in a Single Blob
208 changes: 208 additions & 0 deletions text/0098-store-multiple-replay-segments-in-a-single-blob.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
- Start Date: 2023-05-24
- RFC Type: decision
- RFC PR: https://github.com/getsentry/rfcs/pull/98
- RFC Status: draft

# Summary

Recording data is sent in segments. Each segment is written to its own file. Writing files is the most expensive component of our Google Cloud Storage usage. It is also the most expensive component, in terms of time, in our processing pipeline. By merging many segment files together into a single file we can minimize our costs and maximize our processing pipeline's throughput.

# Motivation

1. Minimize costs.
2. Improve throughput.
3. Enable new features in a cost-effective manner.

# Background

This document exists to inform all relevant stakeholders of our proposal and seek feedback prior to implementation.

# Supporting Data

Google Cloud Storage lists the costs for writing and storing data as two separate categories. Writing a file costs $0.005 per 1000 files. Storing that file costs $0.02 per gigabyte. For the average file this works out to: $0.000000012 for storage and $0.00000005 for the write.

In practical terms, this means 75% of our spend is allocated to writing new files.
cmanallen marked this conversation as resolved.
Show resolved Hide resolved

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the cost is mostly due to write, at scale is this cost problematic? Like is it a blocker for reaching higher scale or are we simply looking for a more efficient option ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not believe cost will prevent us from reaching greater scale. Write cost scale linearly. Pricing does not quite scale linearly but if you're happy with how much GCS costs at low levels of demand you will be happy at peak demand.


# Proposal

First, a new table called "recording_byte_range" with the following structure is created:

| replay_id | segment_id | path | start | stop |
cmanallen marked this conversation as resolved.
Show resolved Hide resolved
| --------- | ---------- | -------- | ----- | ----- |
| A | 0 | file.bin | 0 | 6241 |
| B | 0 | file.bin | 6242 | 8213 |
| A | 1 | file.bin | 8214 | 12457 |

Replay and segment ID should be self-explanatory. Path is the location of the blob in our bucket. Start and stop are integers which represent the index positions in an inclusive range. This range is a contiguous sequence of related bytes. In other words, the segment's compressed data is contained within the range.

Notice each row in the example above points to the same file but with different start and stop locations. This is implies that multiple segments and replays can be present in the same file. A single file can be shared by hundreds of different segments.

This table will need to support, at a minimum, one write per segment.

Second, the Session Replay recording consumer will not commit blob data to Google Cloud Storage for each segment. Instead it will buffer many segments and flush them together as a single blob to GCS. Next it will make a bulk insertion into the database for tracking.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this buffering process going to look like?
It will have to rely on persistent state to prevent you from loosing chunks in case of failover before a file is fully ingested while parts have been already committed in Kafka.

Also a more high level concern with this.
If the idea is to build a generic system reusable by other features, relying on a specific consumer to do the buffering has the important implication that Kafka is always going to be a moving part of your system. Have you considered doing the buffering separately.

Copy link
Member Author

@cmanallen cmanallen Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this buffering process going to look like?

Like buffering would in our Snuba consumers. You keep an array of messages/rows/bytes in memory. When it comes time to flush you zip them together in some protocol specific format. In this case the protocol is byte concatenation.

It will have to rely on persistent state to prevent you from loosing chunks in case of failover before a file is fully ingested while parts have been already committed in Kafka.

If we assume that the replays consumer pushes the bytes to another, generic Kafka consumer which buffers the files before upload then the persistent state will be the log. Upload failures can be re-run from the last committed offset. Persistent failures would have to be handled as part of a DLQ and would require re-running. Potentially introducing a significant amount of latency between (in the replays case) a billing outcome and the replay recording being made available.

Assuming this buffering/upload step exists inside our existing consumer (i.e. not a generic service) then offsets will not be committed until after the batch has been uploaded.

Also a more high level concern with this.
If the idea is to build a generic system reusable by other features, relying on a specific consumer to do the buffering has the important implication that Kafka is always going to be a moving part of your system. Have you considered doing the buffering separately.

I have considered that. The idea being process A buffers a bunch of file-parts/offsets before sending the completed object to permanent storage either through direct interaction, filestore, or Kafka intermediary (or any number of intermediaries). The problem is then that each call site is responsible for the buffer which is the hardest part of the problem.

Kafka is an important part of this system in my mind. When I wrote this document I relied on the guarantees it provides. I think if this is a generic service made available to the company then publishing to the topic should be a simple process for integrators. The fact that its Kafka internally is an irrelevant implementation detail (to the caller).


```mermaid
flowchart
A[Wait For New Message] --> B[Process];
B --> C[Push to Buffer];
C --> D{Buffer Full?};
D -- No --> A;
D -- Yes --> E[Write Single File to GCS];
E --> F[Bulk Insert Byte Ranges];
F --> G[Clear Buffer];
G --> A;
```

Third, when a client requests recording data we will look it up in the "recording_byte_range" table. From it's response, we will issue as many fetch requests as there are rows in the response. These requests may target a single file or many files. The files will be fetched with a special header that instructs the service provider to only respond with a subset of the bytes. Specifically, the bytes that related to our replay.
cmanallen marked this conversation as resolved.
Show resolved Hide resolved

The response bytes will be decompressed, merged into a single payload, and returned to the user as they are now.

# Drawbacks

1. Deleting data becomes tricky. See "Unresolved Questions".
cmanallen marked this conversation as resolved.
Show resolved Hide resolved

# Unresolved Questions

1. Can we keep deleted data in GCS but make it inaccessible?
mdtro marked this conversation as resolved.
Show resolved Hide resolved

- User and project deletes:
- We would remove all capability to access it making it functionally deleted.
cmanallen marked this conversation as resolved.
Show resolved Hide resolved
- GDPR deletes:
- Would this require downloading the file, over-writing the subsequence of bytes, and re-uploading a new file?
- Single replays, small projects, or if the mechanism is infrequently used could make this a valid deletion mechanism.
- The data could be encrypted, with some encryption key stored on the metadata row, making the byte sequence unreadable upon row delete.

2. What datastore should we use to store the byte range information?

- Cassandra, Postgres, AlloyDB?
- Postgres likely won't be able to keep up long-term.
cmanallen marked this conversation as resolved.
Show resolved Hide resolved
- Especially if we write multiple byte ranges per segment.
- Cassandra could be a good choice but its not clear what operational burden this imposes on SnS and Ops.
- AlloyDB seems popular among the SnS team and could be a good choice.
- It can likely interface with the Django ORM. But its not clear to me at the time of writing.
- Whatever database we use must support deletes.

3. How fast can we encrypt and decrypt each segment?

# Extensions

By extending the schema of the "recording_byte_range" table to include a "type" column we can further reduce the number of bytes returned to the client. The client has different requirements for different sets of data. The player may only need the next `n` seconds worth of data, the console and network tabs may paginate their events, and the timeline will always fetch a simplified view of the entire recording.

With the byte range pattern in place these behaviors are possible and can be exposed to the client. The ultimate outcome of this change is faster loading times and the elimination of browser freezes and crashes from large replays.

This will increase the number of rows written to our database table. We would write four rows whereas with the original proposal we were only writing one. Therefore we should select our database carefully to ensure it can handle this level of write intensity.

# Technical Details

## Storage Service Support

The following sections describe the psuedo-code necessary to fetch a range of bytes from a service provider and also links to the documentation where applicable.

**Google Cloud Storage**

```python
from google.cloud.storage import Blob

blob = Blob(filename, bucket)
blob.download_as_bytes(start=start, end=stop)
```

Source: https://cloud.google.com/python/docs/reference/storage/latest/google.cloud.storage.blob.Blob#google_cloud_storage_blob_Blob_download_as_bytes

**AWS S3**

```python
from boto3 import client

response = client("s3", **auth).get_object(
Bucket=bucket,
Key=filename,
Range=f"bytes={start}-{stop}",
)
response["Body"].read()
```

Source: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/get_object.html

**Filesystem**

```python
with open(filename, "r") as f:
f.seek(start)
f.read((stop - start) + 1) # Range is inclusive.
```

## Consumer Buffering Mechanics

We will continue our approach of using _at-least once processing_. Each message we receive is guaranteed to be processed to completion regardless of error or interrupt. Duplicate messages are possible under this scheme and must be accounted for in the planning of each component.

**Buffer Location and Behavior**

The buffer is kept as an in-memory list inside the consumer process. For each message we receive we append the message to the buffer. Afterwards, we check if the buffer is full. If it is we flush. Else we wait for another message.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this will constrain you to a fairly small buffer size.
Also it ties the number of replicas of your consumer to the cost efficiency of the storage, which is quite undesirable:
Assuming you are never going to commit on kafka untill the buffer is flushed (if you did you would not be able to guarantee at least once):

  • If you increase the number of replicas for any reason, each replicas takes less traffic, thus it takes longer to fill the buffer.
  • Unless you tie the commit batch time and size to the number of replicas (which is undesirable - see the snuba consumer), increasing the number of replicas would increase the number of files written per unit of time.
    replicas and file size should not be connected to each other, otherwise we will have to keep in mind a lot of affected moving parts when scaling the consumer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the time to accumulate a batch is less than the time to upload a batch then you need to add a replica. That's the only constraint. You get more efficiency at peak load so its best to run our replicas hot. The deadline will prevent the upload from sitting idle too long. The total scale factor will be determined by the number of machines we can throw at the problem.

Multi-processing/threading, I think, will be deadly to this project. So we will need a lot of single-threaded machines running.

I re-wrote this response several times. Its as disorganized as my thoughts are on this. Happy to hear critiques.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless you tie the commit batch time and size to the number of replicas (which is undesirable - see the snuba consumer), increasing the number of replicas would increase the number of files written per unit of time.

I think this is an okay outcome. If you double the number of replicas you halve the number of parts per file and double the number of files. That reduces cost efficiency but the throughput efficiency remains the same for each replica. Ignoring replica count, cost efficiency will ebb and flow with the variations in load we receive throughout the day.

We still come out ahead because the total number of files written per second is less than the current implementation which is 1 file per message.

Copy link
Member Author

@cmanallen cmanallen Jul 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[...] otherwise we will have to keep in mind a lot of affected moving parts when scaling the consumer.

We should scale our replicas agnostic to the implementation of the buffer's flush mechanics. I mentioned above about cost-efficiency being a hard target. So I don't think we should target it.

A deadline should be present to guarantee regular buffer commits and a max buffer size should exist to prevent us from using too many resources. I think those two commit semantics save us from having to think about the implications of adding replicas. The throughput of a single machine may drop but the risk of back log has decreased across the cluster.


This is a somewhat simplified view of whats happening. In reality we will have time based flushing and a timeout mechanism for message listening. This ensures the buffer does not stay partially full indefinitely.

**Buffer Flush**

On flush the buffer will take every message in the list and merge them together into a single bytes object. This bytes object will then be uploaded to the storage service-provider. Upon successful upload the start and stop byte range values of each message are stored in a database in addition to other metadata such as their replay_id and segment_id. Finally, the last offset is committed to Kafka.

**Handling Consumer Restarts**

If the consumer restarts with a non-empty buffer, the buffer's last item's offset will not be committed. When the consumer resumes it will start processing from the last offset committed (i.e. the last item in the last successfully-flushed-buffer). The buffer will be rebuilt exactly as it was prior to restart.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the target size of each file you want to write and the target buffer size ?
The approach of throwing away the in flight work is generally ok if the buffer are very small and the time it takes to reprocessing is negligible. If this is not the case (you want large files) you may create a lot of noise when kafka rebalances consumer groups and create a lot of backlog.

Copy link
Member Author

@cmanallen cmanallen Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the commit criterion I have (naively) envisioned. All these values are adjustable.

  • Commit if filesize >= 10MB.
  • Commit if num_parts >= 1000.
  • Commit if five-second-deadline exceeded.

10MB and 1000 parts can be scaled down to 1MB and 100 parts if they seem unrealistic. Any lower and the concept, I think, has run its course and is not worth pursuing.

the time it takes to reprocessing is negligible

I believe this will be the case. A generic consumer implementation should only be doing byte concatenation. But depending on size it may take a while to fetch those files over the network to even begin buffering.

I would prefer this generic consumer deployed independently of getsentry so re-balances are less common.


**Storage Service Failure**

If we can not communicate with the storage provider we have several options.

1. Catch the exception and commit the offset anyway. This means all the segments in the buffer would be lost.
cmanallen marked this conversation as resolved.
Show resolved Hide resolved
2. Do not catch the exception and let the consumer rebuild the buffer from its last saved offset.
3. Catch the exception and retry.

Option three is the preferred solution but the semantics of the retry behavior can get complicated depending on how the system is constructed. For example, how long do you retry? How do retries affect message processing? Do you communicate with the service provider in a thread? If so how do you manage resources?

A blocking approach is the simplest solution but it does not offer maximum throughput.

**Managing Effects**

With a buffered approach most of the consumer's effects are accomplished in two bulk operations. However, click search, segment-0 outcome tracking, and segment-0 project lookup are not handle-able in this way. We will address each case independently below.

1. Click Tracking.
- Click events are published to the replay-event Kafka consumer.
- This publishing step is asynchronous and relies on threading to free up the main process thread.
- This operation is measured in microseconds and is not anticipated to significantly impact total throughput.
2. Outcome Tracking.
- Outcome events are published to the outcomes Kafka consumer.
- This publishing step is asynchronous and relies on threading to free up the main process thread.
- This operation only occurs for segment-0 events.
- This operation is measured in microseconds and is not anticipated to significantly impact total throughput.
3. Project lookup.
- Projects are retrieved by a cache lookup or querying PostgreSQL if it could not be found.
- This operation typically takes >1ms to complete.
- This operation only occurs for segment-0 events.
- Querying this information in a tight loop is not an ideal situation.
- Forwarding the project_id to a secondary Kafka consumer would free up resources on our main consumer and allow the secondary consumer to optimize for this type of workload.
- Alternatively, another method for looking up the project's `has_replay` flag could be found.

**Duplicate Message Handling**

1. Google Cloud Storage.
- Unique filename generation per buffer would mean that a segment could be present in multiple files.
cmanallen marked this conversation as resolved.
Show resolved Hide resolved
- This has COGS implications but does not impact our application.
2. "recording_byte_range" table.
- Duplicate replay, segment ID pairs will be recorded in the table.
- A reader must either select distinct or group by the replay_id, segment_id pair.
- Neither row has precendence over the other but the filename value must come from the same row as the start and stop byte range values.
3. Outcome tracking.
- Duplicate outcomes will be recorded for a given replay.
- The replay_id functions as an idempotency token in the outcomes consumer and prevents the customer from being charged for the same replay multiple times.
4. Click tracking.
- Duplicate click events will be inserted for a replay, segment pair.
- This is an acceptable outcome and will not impact search behavior.

## Security

How do we prevent users from seeing segments that do not belong to them? The short-answer is test coverage. Recording another segment's byte range would be the same as generating the incorrect filename under the current system. These outcomes are prevented with robust test coverage.

In the case of bad byte range math, we do have some implicit protection. Each segment is compressed independently. Fetching a malformed byte range would yield unreadable data. A bad byte range either truncates the compression headers or includes unintelligible bytes at the beginning or end of the sequence. If we manage to decompress a subset of a valid byte range the decompressed output would be malformed JSON and would not be returnable to the user.
cmanallen marked this conversation as resolved.
Show resolved Hide resolved

Additionally, each segment could be encrypted with an encryption-key that is stored on the row. Requesting an invalid byte range would yield malformed data which could not be decrypted with the encryption-key stored on the row.