Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rfc(decision): Batch multiple files together into single large file to improve network throughput #98

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ This repository contains RFCs and DACIs. Lost?
- [0086-sentry-bundler-plugins-api](text/0086-sentry-bundler-plugins-api.md): Sentry Bundler Plugins API
- [0088-fix-memory-limitiations-in-session-replays-access-pattern](text/0088-fix-memory-limitiations-in-session-replays-access-pattern.md): Fix Memory Limitiations in Session Replay's Access Pattern
- [0092-replay-issue-creation](text/0092-replay-issue-creation.md): Replay Issue Creation
- [0098-store-multiple-replay-segments-in-a-single-blob](text/0098-store-multiple-replay-segments-in-a-single-blob.md): Store Multiple Replay Segments in a Single Blob
252 changes: 252 additions & 0 deletions text/0098-store-multiple-replay-segments-in-a-single-blob.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
- Start Date: 2023-05-24
- RFC Type: decision
- RFC PR: https://github.com/getsentry/rfcs/pull/98
- RFC Status: draft

# Summary

Recording data is sent in segments. Each segment is written to its own file. Writing files is the most expensive component of our Google Cloud Storage usage. It is also the most expensive component, in terms of time, in our processing pipeline. By merging many segment files together into a single file we can minimize our costs and maximize our processing pipeline's throughput.

# Motivation

1. Minimize costs.
2. Increase write throughput.
3. Enable new features in a cost-effective manner.

# Background

This document was originally written to respond to a percieved problem in the Session Replays recording consumer. However, upon exploring these ideas more it was determined that this could be more generally applied to the organization as a whole. For that reason I've made many of the names generic but have also retained many references to Session Replay.

# Supporting Data

Google Cloud Storage lists the costs for writing and storing data as two separate categories. Writing a file costs $0.005 per 1000 files. Storing that file costs $0.02 per gigabyte. For the average Session Replay file (with a retention period of 90 days) this works out to: $0.000000012 for storage and $0.00000005 for the write.

In practical terms, this means 75% of our spend is allocated to writing new files.
cmanallen marked this conversation as resolved.
Show resolved Hide resolved

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the cost is mostly due to write, at scale is this cost problematic? Like is it a blocker for reaching higher scale or are we simply looking for a more efficient option ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not believe cost will prevent us from reaching greater scale. Write cost scale linearly. Pricing does not quite scale linearly but if you're happy with how much GCS costs at low levels of demand you will be happy at peak demand.


# High Level Overview

1. We will store multiple "parts" per file.
- A "part" is a distinct blob of binary data.
- It exists as a subset of bytes within a larger set of bytes (referred to as a "file").
- A "part" could refer to a replay segment or to a sourcemap or anything that requires storage in a blob storage service.
2. Each "part" within a file will be encrypted.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since encryption is used here entirely to make deletion of individual parts quicker, could you please expand on:

  • whether we are confident that this is a use case worth optimizing for (deleting individual part quickly)
  • how would the process look like if we just accepted to rewrite files entirely in order to remove the part.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whether we are confident that this is a use case worth optimizing for (deleting individual part quickly)

It is necessary to support GDPR, project, and point deletes. I consider it a high priority. The alternative is rotating the file with the offending parts removed.

how would the process look like if we just accepted to rewrite files entirely in order to remove the part.

The operations is no particular order:

  1. Download the file
  2. Get the active byte ranges from the database.
  3. Remove all byte ranges from the file not found in the set of returned byte ranges.
  4. Delete all references to the file in the database.
  5. Upload the new file.
  6. Insert new offset rows.
  7. Delete the old file.

Repeat for every delete operation. Deletes must be single-threaded per file to prevent concurrent access. You can use kafka and partition on filename.

- Encryption provides instantaneous deletes (by deleting the row containing the encryption key) and removes the need to remove the byte sub-sequences from a blob.
- We will use envelope encryption to protect the contents of every file.
- https://cloud.google.com/kms/docs/envelope-encryption
- Related, contiguous byte ranges will be encrypted independently of the rest of the file.
- We will use KMS to manage our key-encryption-keys.
Copy link

@fpacifici fpacifici Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KMS ?
Are you talking about GCP KMS ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Google Key Management Service. Subject to change.

- Data-encryption-keys will be generated locally and will be unique.
3. Parts will be tracked in a metadata table on an AlloyDB instance(s).
- A full table schema is provided in the **Proposal** section.
- AlloyDB was chosen because its a managed database with strong point-query performace.
- The metadata table will contain the key used to decrypt the byte range.
4. On read, parts will be fetched without fetching the full file.
- More details are provided in the **Technical Details** section.

# Proposal

First, a new table called "file_part_byte_range" with the following structure is created:

| id | key | path | start | stop | dek | kek_id | created_at |
| --- | --- | -------- | ----- | ----- | -------- | ------ | ------------------- |
| 1 | A:0 | file.bin | 0 | 6241 | Aq3[...] | 1 | 2023-01-01T01:01:01 |
| 2 | B:0 | file.bin | 6242 | 8213 | ppT[...] | 1 | 2023-01-01T01:01:01 |
| 3 | A:1 | file.bin | 8214 | 12457 | 99M[...] | 1 | 2023-01-01T01:01:01 |

- The key field is client generated identifier.
- It is not unique.
- The value of the key field should be easily computable by your service.
Copy link

@fpacifici fpacifici Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not ensuring this is unique and avoiding having two keys ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least once delivery guarantees. If you're bulk inserting 1000 rows per upload then it becomes difficult to split out the rows that have duplicates.

But, thinking on this more... my plan is to use Kafka for this. Ordering guarantees could mean that the file batch is deterministic (assuming we don't have a stateful countdown timer). If 1 row exists in the database then they all must exist (therefore the batch was already written - commit offsets and move on). I suppose re-balancing would not affect this? A unique constraint could be possible.

That being said I think a deadline is important so files don't sit idle in the consumer for undefined periods of time.

Copy link
Member Author

@cmanallen cmanallen Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then again a read query against the unique key prior to the insert operation could satisfy this constraint. So yes I would say a unique constraint is possible here.

- In the case of Session Replay the key could be a concatenation of `replay_id` and `segment_id`.
- Illustrated above as `replay_id:segment_id`.
- Alternatively, a true composite key could be stored on a secondary table which contains a reference to the `id` of the `file_part_byte_range` row.
- Path is the location of the blob in our bucket.
- Start and stop are integers which represent the index positions in an inclusive range.
- This range is a contiguous sequence of related bytes.
- In other words, the entirety of the file part's encrypted data is contained within the range.
- The "dek" column is the **D**ata **E**ncryption **K**ey.
- The DEK is the key that was used to encrypt the byte range.
- The key itself is encrypted by the KEK.
- **K**ey **E**ncryption **K**ey.
- Encryption is explored in more detail in the following sections.
- The "kek_id" column contains the ID of the KEK used to encrypt the DEK.
- This KEK can be fetched from a remote **K**ey **M**anagement **S**ervice or a local database table.

Notice each row in the example above points to the same file but with different start and stop locations. This implies that multiple, independent parts can be present in the same file. A single file can be shared by hundreds of different parts.

Second, the Session Replay recording consumer will not commit blob data to Google Cloud Storage for each segment. Instead it will buffer many segments and flush them together as a single blob to GCS. Next it will make a bulk insertion into the database for tracking.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this buffering process going to look like?
It will have to rely on persistent state to prevent you from loosing chunks in case of failover before a file is fully ingested while parts have been already committed in Kafka.

Also a more high level concern with this.
If the idea is to build a generic system reusable by other features, relying on a specific consumer to do the buffering has the important implication that Kafka is always going to be a moving part of your system. Have you considered doing the buffering separately.

Copy link
Member Author

@cmanallen cmanallen Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this buffering process going to look like?

Like buffering would in our Snuba consumers. You keep an array of messages/rows/bytes in memory. When it comes time to flush you zip them together in some protocol specific format. In this case the protocol is byte concatenation.

It will have to rely on persistent state to prevent you from loosing chunks in case of failover before a file is fully ingested while parts have been already committed in Kafka.

If we assume that the replays consumer pushes the bytes to another, generic Kafka consumer which buffers the files before upload then the persistent state will be the log. Upload failures can be re-run from the last committed offset. Persistent failures would have to be handled as part of a DLQ and would require re-running. Potentially introducing a significant amount of latency between (in the replays case) a billing outcome and the replay recording being made available.

Assuming this buffering/upload step exists inside our existing consumer (i.e. not a generic service) then offsets will not be committed until after the batch has been uploaded.

Also a more high level concern with this.
If the idea is to build a generic system reusable by other features, relying on a specific consumer to do the buffering has the important implication that Kafka is always going to be a moving part of your system. Have you considered doing the buffering separately.

I have considered that. The idea being process A buffers a bunch of file-parts/offsets before sending the completed object to permanent storage either through direct interaction, filestore, or Kafka intermediary (or any number of intermediaries). The problem is then that each call site is responsible for the buffer which is the hardest part of the problem.

Kafka is an important part of this system in my mind. When I wrote this document I relied on the guarantees it provides. I think if this is a generic service made available to the company then publishing to the topic should be a simple process for integrators. The fact that its Kafka internally is an irrelevant implementation detail (to the caller).


```mermaid
flowchart
A[Wait For New Message] --> B[Process];
B --> C[Push to Buffer];
C --> D{Buffer Full?};
D -- No --> A;
D -- Yes --> E[Write Single File to GCS];
E --> F[Bulk Insert Byte Ranges];
F --> G[Clear Buffer];
G --> A;
```

## Writing

Writing a file part is a four step process.

First, the bytes must be encrypted with a randomly generated DEK. Second, the DEK is encrypted with a KEK. Third, the file is uploaded to the cloud storage provider. Fourth, a metadata row is written to the "file_part_byte_range" containing a key, the containing blob's filepath, start and stop offsets, and the encrypted DEK.

**A Note on Aggregating File Parts**

It is up to the implementer to determine how many parts exist in a file. An implementer may choose to store one part per file or may store an unbounded number of parts per file.

However, if you are using this system, it is recommended that more than one part be stored per file. Otherwise it is more economical to upload the file using simpler, more-direct methods.

## Reading

To read a file part the metadata row in the "file_part_byte_range" table is fetched. Using the filepath, starting byte, and ending byte we fetch the encrypted bytes from remote storage. Now that we have our encrypted bytes we can use the DEK we fetched from the "file_part_byte_range" table to decrypt the blob and return it to the user.

## Deleting

To delete a file part the metadata row in the "file_part_byte_range" table is deleted. With the removal of the DEK, the file part is no longer readable and is considered deleted.

Project deletes, user deletes, GDPR deletes, and user-access TTLs are managed by deleting the metadata row in the "file_part_byte_range" table.

File parts can be grouped into like-retention-periods and deleted manually or automatically after expiry. However, in the case of replays, storage costs are minor. We will retain our encrypted segment data for the maximum retention period of 90 days.

## Key Rotation

If a KEK is compromised and needs to be rotated we will need to follow a four step process. First, we query for every row in the "file_part_byte_range" table whose DEK was encrypted with the old KEK. Second, we will decrypt every DEK with the old KEK. Third, we will encrypt the DEK with a new KEK. Fourth, the old KEK is dropped.

DEKs are more complicated to rotate as it requires modifying the blob. However, because DEKs are unique to a byte range within a single file we have a limited surface area for a compromised key to be exploited. To rotate a DEK first download the blob, second decrypt the byte range with the compromised DEK, third generate a new DEK, fourth encrypt the payload with the new DEK, fifth encrypt the new DEK with any KEK, and sixth upload and re-write the metadata rows with the new offsets.
cmanallen marked this conversation as resolved.
Show resolved Hide resolved

# Drawbacks

# Questions

1. If KEKs are managed in a remote service how do we manage outages?
- We manage it in the same way we manage an outage of any other remote service.
- We will need to backlog or otherwise 400/404.
- KEKs have the benefit of _probably_ not blocking ingest as the key will be cached for long stretches of time (24 hours) and can be used for longer periods of time if a new key can not be fetched.
2. How will read efficiency be impacted if we rely on a remote service to decrypt blob data?
- It will have some cost but hopefully that cost is minimized by the constraints of your system.
- For example, Session Replay fetches multiple segment blobs in a single request. At most we will need to fetch two keys (and in the majority of cases a single key) to decrypt the segments.
- This key fetching latency is immaterial to the total latency of the request.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the self hosted open source implementation for this

Copy link
Member Author

@cmanallen cmanallen Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Environment variable named SECRET_KEY.

3. How will key rotation work in a production system?
- Hopefully it will be a rare event.
- KEK rotation will require re-encrypting every DEK encrypted with the KEK (typically everything in a ~24-hour period).
- DEK rotation will require re-encrypting a sequence of bytes in a blob.

# Extensions

By extending the schema of the "file_part_byte_range" table to include a "type" column we can further reduce the number of bytes returned to the client. The client has different requirements for different sets of data. The player may only need the next `n` seconds worth of data, the console and network tabs may paginate their events, and the timeline will always fetch a simplified view of the entire recording.

With the byte range pattern in place these behaviors are possible and can be exposed to the client. The ultimate outcome of this change is faster loading times and the elimination of browser freezes and crashes from large replays.

This will increase the number of rows written to our database table. We would write four rows whereas with the original proposal we were only writing one. Therefore we should select our database carefully to ensure it can handle this level of write intensity.

# Technical Details

## Storage Service Support

The following sections describe the psuedo-code necessary to fetch a range of bytes from a service provider and also links to the documentation where applicable.

**Google Cloud Storage**

```python
from google.cloud.storage import Blob

blob = Blob(filename, bucket)
blob.download_as_bytes(start=start, end=stop)
```

Source: https://cloud.google.com/python/docs/reference/storage/latest/google.cloud.storage.blob.Blob#google_cloud_storage_blob_Blob_download_as_bytes

**AWS S3**

```python
from boto3 import client

response = client("s3", **auth).get_object(
Bucket=bucket,
Key=filename,
Range=f"bytes={start}-{stop}",
)
response["Body"].read()
```

Source: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/get_object.html

**Filesystem**

```python
with open(filename, "r") as f:
f.seek(start)
f.read((stop - start) + 1) # Range is inclusive.
```

## Consumer Buffering Mechanics

The following section is highly specific to the Session Replay product.

We will continue our approach of using _at-least once processing_. Each message we receive is guaranteed to be processed to completion regardless of error or interrupt. Duplicate messages are possible under this scheme and must be accounted for in the planning of each component.

**Buffer Location and Behavior**

The buffer is kept as an in-memory list inside the consumer process. For each message we receive we append the message to the buffer. Afterwards, we check if the buffer is full. If it is we flush. Else we wait for another message.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this will constrain you to a fairly small buffer size.
Also it ties the number of replicas of your consumer to the cost efficiency of the storage, which is quite undesirable:
Assuming you are never going to commit on kafka untill the buffer is flushed (if you did you would not be able to guarantee at least once):

  • If you increase the number of replicas for any reason, each replicas takes less traffic, thus it takes longer to fill the buffer.
  • Unless you tie the commit batch time and size to the number of replicas (which is undesirable - see the snuba consumer), increasing the number of replicas would increase the number of files written per unit of time.
    replicas and file size should not be connected to each other, otherwise we will have to keep in mind a lot of affected moving parts when scaling the consumer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the time to accumulate a batch is less than the time to upload a batch then you need to add a replica. That's the only constraint. You get more efficiency at peak load so its best to run our replicas hot. The deadline will prevent the upload from sitting idle too long. The total scale factor will be determined by the number of machines we can throw at the problem.

Multi-processing/threading, I think, will be deadly to this project. So we will need a lot of single-threaded machines running.

I re-wrote this response several times. Its as disorganized as my thoughts are on this. Happy to hear critiques.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless you tie the commit batch time and size to the number of replicas (which is undesirable - see the snuba consumer), increasing the number of replicas would increase the number of files written per unit of time.

I think this is an okay outcome. If you double the number of replicas you halve the number of parts per file and double the number of files. That reduces cost efficiency but the throughput efficiency remains the same for each replica. Ignoring replica count, cost efficiency will ebb and flow with the variations in load we receive throughout the day.

We still come out ahead because the total number of files written per second is less than the current implementation which is 1 file per message.

Copy link
Member Author

@cmanallen cmanallen Jul 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[...] otherwise we will have to keep in mind a lot of affected moving parts when scaling the consumer.

We should scale our replicas agnostic to the implementation of the buffer's flush mechanics. I mentioned above about cost-efficiency being a hard target. So I don't think we should target it.

A deadline should be present to guarantee regular buffer commits and a max buffer size should exist to prevent us from using too many resources. I think those two commit semantics save us from having to think about the implications of adding replicas. The throughput of a single machine may drop but the risk of back log has decreased across the cluster.


This is a somewhat simplified view of whats happening. In reality we will have time based flushing and a timeout mechanism for message listening. This ensures the buffer does not stay partially full indefinitely.

**Buffer Flush**

On flush the buffer will take every message in the list and merge them together into a single bytes object. This bytes object will then be uploaded to the storage service-provider. Upon successful upload the start and stop byte range values of each message are stored in a database in addition to other metadata such as their replay_id and segment_id. Finally, the last offset is committed to Kafka.

**Handling Consumer Restarts**

If the consumer restarts with a non-empty buffer, the buffer's last item's offset will not be committed. When the consumer resumes it will start processing from the last offset committed (i.e. the last item in the last successfully-flushed-buffer). The buffer will be rebuilt exactly as it was prior to restart.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the target size of each file you want to write and the target buffer size ?
The approach of throwing away the in flight work is generally ok if the buffer are very small and the time it takes to reprocessing is negligible. If this is not the case (you want large files) you may create a lot of noise when kafka rebalances consumer groups and create a lot of backlog.

Copy link
Member Author

@cmanallen cmanallen Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the commit criterion I have (naively) envisioned. All these values are adjustable.

  • Commit if filesize >= 10MB.
  • Commit if num_parts >= 1000.
  • Commit if five-second-deadline exceeded.

10MB and 1000 parts can be scaled down to 1MB and 100 parts if they seem unrealistic. Any lower and the concept, I think, has run its course and is not worth pursuing.

the time it takes to reprocessing is negligible

I believe this will be the case. A generic consumer implementation should only be doing byte concatenation. But depending on size it may take a while to fetch those files over the network to even begin buffering.

I would prefer this generic consumer deployed independently of getsentry so re-balances are less common.


**Storage Service Failure**

If we can not communicate with the storage provider we have several options.

1. Catch the exception and commit the offset anyway. This means all the segments in the buffer would be lost.
cmanallen marked this conversation as resolved.
Show resolved Hide resolved
2. Do not catch the exception and let the consumer rebuild the buffer from its last saved offset.
3. Catch the exception and retry.

Option three is the preferred solution but the semantics of the retry behavior can get complicated depending on how the system is constructed. For example, how long do you retry? How do retries affect message processing? Do you communicate with the service provider in a thread? If so how do you manage resources?

A blocking approach is the simplest solution but it does not offer maximum throughput.

**Managing Effects**

With a buffered approach most of the consumer's effects are accomplished in two bulk operations. However, click search, segment-0 outcome tracking, and segment-0 project lookup are not handle-able in this way. We will address each case independently below.

1. Click Tracking.
- Click events are published to the replay-event Kafka consumer.
- This publishing step is asynchronous and relies on threading to free up the main process thread.
- This operation is measured in microseconds and is not anticipated to significantly impact total throughput.
2. Outcome Tracking.
- Outcome events are published to the outcomes Kafka consumer.
- This publishing step is asynchronous and relies on threading to free up the main process thread.
- This operation only occurs for segment-0 events.
- This operation is measured in microseconds and is not anticipated to significantly impact total throughput.
3. Project lookup.
- Projects are retrieved by a cache lookup or querying PostgreSQL if it could not be found.
- This operation typically takes >1ms to complete.
- This operation only occurs for segment-0 events.
- Querying this information in a tight loop is not an ideal situation.
- Forwarding the project_id to a secondary Kafka consumer would free up resources on our main consumer and allow the secondary consumer to optimize for this type of workload.
- Alternatively, another method for looking up the project's `has_replay` flag could be found.

**Duplicate Message Handling**

1. Google Cloud Storage.
- Unique filename generation per buffer would mean that a segment could be present in multiple files.
cmanallen marked this conversation as resolved.
Show resolved Hide resolved
- This has COGS implications but does not impact our application.
2. "file_part_byte_range" table.
- Duplicate replay, segment ID pairs will be recorded in the table.
- A reader must either select distinct or group by the replay_id, segment_id pair.
- Neither row has precendence over the other but the filename value must come from the same row as the start and stop byte range values.
3. Outcome tracking.
- Duplicate outcomes will be recorded for a given replay.
- The replay_id functions as an idempotency token in the outcomes consumer and prevents the customer from being charged for the same replay multiple times.
4. Click tracking.
- Duplicate click events will be inserted for a replay, segment pair.
- This is an acceptable outcome and will not impact search behavior.