-
Notifications
You must be signed in to change notification settings - Fork 2.3k
feat(batch-exports): Add Azure Blob Storage as batch export destination #43977
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(batch-exports): Add Azure Blob Storage as batch export destination #43977
Conversation
Greptile's behavior is changing!From now on, if a review finishes with no comments, we will not post an additional "statistics" comment to confirm that our review found nothing to comment on. However, you can confirm that we reviewed your changes in the status check section. This feature can be toggled off in your Code Review Settings by deselecting "Create a status check for each PR". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4 issues found across 24 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="docker-compose.base.yml">
<violation number="1" location="docker-compose.base.yml:193">
P2: Named volume `azurite-data` is used but not defined in the top-level `volumes:` section. Add `azurite-data:` to the volumes section at the bottom of the file.</violation>
</file>
<file name="products/batch_exports/backend/tests/temporal/destinations/azure_blob/test_workflow_with_azure_account.py">
<violation number="1" location="products/batch_exports/backend/tests/temporal/destinations/azure_blob/test_workflow_with_azure_account.py:69">
P2: The `client.close()` call won't execute if `delete_blob` raises an exception during cleanup, causing a resource leak. Wrap cleanup in try/finally to ensure the client is always closed.</violation>
</file>
<file name="products/batch_exports/backend/tests/temporal/destinations/azure_blob/utils.py">
<violation number="1" location="products/batch_exports/backend/tests/temporal/destinations/azure_blob/utils.py:163">
P2: Weak test assertion: if `expected_session_ids` is empty (no events have properties with `$session_id`), this assertion will always pass since an empty set is a subset of any set. Consider adding a check that `expected_session_ids` is non-empty, or using a stricter equality check if that's the intended behavior.</violation>
</file>
<file name="products/batch_exports/backend/temporal/destinations/azure_blob_batch_export.py">
<violation number="1" location="products/batch_exports/backend/temporal/destinations/azure_blob_batch_export.py:145">
P2: Accessing `COMPRESSION_EXTENSIONS[compression]` without validation will raise a cryptic `KeyError` if an unsupported compression type is passed. Consider adding error handling similar to the file format check above.</violation>
</file>
Since this is your first cubic review, here's how it works:
- cubic automatically reviews your code and comments on bugs and improvements
- Teach cubic by replying to its comments. cubic learns from your replies and gets better over time
- Ask questions if you need clarification on any suggestion
Reply to cubic to teach it or ask questions. Re-run a review with @cubic-dev-ai review this PR
...h_exports/backend/tests/temporal/destinations/azure_blob/test_workflow_with_azure_account.py
Outdated
Show resolved
Hide resolved
products/batch_exports/backend/tests/temporal/destinations/azure_blob/utils.py
Outdated
Show resolved
Hide resolved
products/batch_exports/backend/temporal/destinations/azure_blob_batch_export.py
Outdated
Show resolved
Hide resolved
| batch_export_id=inputs.batch_export_id, | ||
| data_interval_start=inputs.data_interval_start, | ||
| data_interval_end=inputs.data_interval_end, | ||
| max_record_batch_size_bytes=1024 * 1024 * 60, # 60MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| transformer = ParquetStreamTransformer( | ||
| compression=inputs.compression, | ||
| include_inserted_at=True, | ||
| max_file_size_bytes=inputs.max_file_size_mb * 1024 * 1024 if inputs.max_file_size_mb else 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| await blob_client.upload_blob( | ||
| bytes(self.current_buffer), | ||
| overwrite=True, | ||
| max_concurrency=self.max_concurrency, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Azure SDK's upload_blob() handles what we manually implemented for S3:
- Automatically chunks large blobs into blocks (configurable via max_block_size, default is 4MB)
- Uploads blocks in parallel (configurable via max_concurrency)
- Retries with exponential backoff by default (ExponentialRetry policy)
- Commits the block list atomically after all uploads complete
Refs:
Upload Blob API
Upload with configuration options
Retry policy configuration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the future: Does this mean that we cannot call upload_blob concurrently and upload parts out of order? This is possible with S3, and it is something we have been testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes Azure supports the same pattern. stage_block() + commit_block_list() is Azure's equivalent of S3's multipart upload (upload_part() + complete_multipart_upload()). You can stage blocks concurrently and out of order, then commit them at the end.
Currently upload_blob() handles this internally, but we can switch to the primitives if we need manual control.
Refs:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, not needed at the moment. I was curious as it may be an optimization we explore in the future.
| "BATCH_EXPORT_AZURE_BLOB_RECORD_BATCH_QUEUE_MAX_SIZE_BYTES", 0, type_cast=int | ||
| ) | ||
| BATCH_EXPORT_AZURE_BLOB_MAX_CONCURRENT_UPLOADS: int = get_from_env( | ||
| "BATCH_EXPORT_AZURE_BLOB_MAX_CONCURRENT_UPLOADS", 5, type_cast=int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defaulting to 5 concurrent uploads, matching S3.
| 3. Generate a connection string with access to the container | ||
|
|
||
| > [!NOTE] | ||
| > For PostHog employees, check the password manager for Azure Storage development credentials. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: I'm assuming PostHog has internal Azure creds here. Please let me know if this isn't the case :) happy to update.
| @pytest.fixture(autouse=True) | ||
| def mock_kafka_producer_for_tests(monkeypatch): | ||
| """Mock Kafka producer to prevent connection attempts during tests. | ||
|
|
||
| The try_produce_app_metrics function attempts to connect to Kafka to send | ||
| metrics. In test environments without Kafka, this causes workflow failures | ||
| even when the actual export succeeds. | ||
| """ | ||
| mock_producer = MagicMock() | ||
| mock_producer.__aenter__ = AsyncMock(return_value=mock_producer) | ||
| mock_producer.__aexit__ = AsyncMock(return_value=None) | ||
| mock_producer.send = AsyncMock() | ||
| mock_producer.flush = AsyncMock() | ||
|
|
||
| monkeypatch.setattr( | ||
| "products.batch_exports.backend.temporal.batch_exports.aiokafka.AIOKafkaProducer", | ||
| lambda **kwargs: mock_producer, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixture mocks the Kafka producer for internal telemetry (try_produce_app_metrics), not Kafka as a destination. Without this, I saw tests fail trying to connect to kafka:9092 (Docker hostname) from my host machine, even when the actual export succeeds. Kafka destination tests are unaffected since they manage their own connections.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has not been an issue for me (nor has anybody in the team mentioned it). I think the issue may be local to your machine. Is kafka somewhere in your /etc/hosts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, this one is not on you, for some reason we have tagged that step as deprecated (see: https://posthog.com/handbook/engineering/manual-dev-setup).
I'm having a chat with the team about this, but if I had to guess I would say kafka not being in your /etc/hosts is likely the error and we don't need this fixture.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked: flox activate should do this for you now.
|
While I've kept Azure SDK defaults (or followed patterns from other destinations), I wanted to get the team's input on how we approach export performance tuning for this new destination. Parameters that affect performance:
Questions for the team:
Just wanted to touch on this before we merge since it could affect throughput at scale. |
|
Another question, do we want to have this under a feature flag for gradual rollout? Or do we think internal testing (with a real Azure account) should be sufficient? I'm leaning toward internal testing being sufficient since the implementation follows the same patterns as S3 and the blast radius is limited to users who explicitly configure this destination. We can monitor early usage closely after rollout to catch any issues. |
|
Not sure why team devex was tagged as reviewer here, we can take it. |
|
Will review in the next coming days. |
tomasfarias
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I lied, reviewing this now.
Overall, a pretty good work, There are a few things I'd like to discuss before giving this the green light though, see my comments below.
posthog/migrations/max_migration.txt
Outdated
| @@ -1 +1 @@ | |||
| 0952_add_billable_action_to_hogflows | |||
| 0953_add_azure_blob_destination | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is pretty large, so it will take a while to review, and with the rate at which we merge stuff to master there is a pretty good chance this migration will raise conflicts constantly.
Do you mind me pushing commits to bump the migration so that we can get it deployed when there is an opening? @buildwithmalik
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! go ahead.
products/batch_exports/backend/temporal/destinations/azure_blob_batch_export.py
Outdated
Show resolved
Hide resolved
products/batch_exports/backend/temporal/destinations/azure_blob_batch_export.py
Outdated
Show resolved
Hide resolved
products/batch_exports/backend/temporal/destinations/azure_blob_batch_export.py
Outdated
Show resolved
Hide resolved
products/batch_exports/backend/temporal/destinations/azure_blob_batch_export.py
Outdated
Show resolved
Hide resolved
...h_exports/backend/tests/temporal/destinations/azure_blob/test_workflow_with_azure_account.py
Outdated
Show resolved
Hide resolved
products/batch_exports/backend/tests/temporal/destinations/azure_blob/utils.py
Outdated
Show resolved
Hide resolved
| @pytest.fixture(autouse=True) | ||
| def mock_kafka_producer_for_tests(monkeypatch): | ||
| """Mock Kafka producer to prevent connection attempts during tests. | ||
|
|
||
| The try_produce_app_metrics function attempts to connect to Kafka to send | ||
| metrics. In test environments without Kafka, this causes workflow failures | ||
| even when the actual export succeeds. | ||
| """ | ||
| mock_producer = MagicMock() | ||
| mock_producer.__aenter__ = AsyncMock(return_value=mock_producer) | ||
| mock_producer.__aexit__ = AsyncMock(return_value=None) | ||
| mock_producer.send = AsyncMock() | ||
| mock_producer.flush = AsyncMock() | ||
|
|
||
| monkeypatch.setattr( | ||
| "products.batch_exports.backend.temporal.batch_exports.aiokafka.AIOKafkaProducer", | ||
| lambda **kwargs: mock_producer, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has not been an issue for me (nor has anybody in the team mentioned it). I think the issue may be local to your machine. Is kafka somewhere in your /etc/hosts?
| @pytest.fixture(autouse=True) | ||
| def mock_kafka_producer_for_tests(monkeypatch): | ||
| """Mock Kafka producer to prevent connection attempts during tests. | ||
|
|
||
| The try_produce_app_metrics function attempts to connect to Kafka to send | ||
| metrics. In test environments without Kafka, this causes workflow failures | ||
| even when the actual export succeeds. | ||
| """ | ||
| mock_producer = MagicMock() | ||
| mock_producer.__aenter__ = AsyncMock(return_value=mock_producer) | ||
| mock_producer.__aexit__ = AsyncMock(return_value=None) | ||
| mock_producer.send = AsyncMock() | ||
| mock_producer.flush = AsyncMock() | ||
|
|
||
| monkeypatch.setattr( | ||
| "products.batch_exports.backend.temporal.batch_exports.aiokafka.AIOKafkaProducer", | ||
| lambda **kwargs: mock_producer, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, this one is not on you, for some reason we have tagged that step as deprecated (see: https://posthog.com/handbook/engineering/manual-dev-setup).
I'm having a chat with the team about this, but if I had to guess I would say kafka not being in your /etc/hosts is likely the error and we don't need this fixture.
I am of the opinion that any performance work not based on production data is pointless. Let's ship and adjust over time. We regularly look at performance interally.
It was local testing so take this with a big grain of salt.
Fine to leave it as it is. We can add more configuration settings over time.
Some discussions internally about this are ongoing, but nothing concrete yet. So, nothing to be concerned about in this PR. |
I think it's better to include a feature flag. I do agree with you that internal testing is sufficient, and that is generally what we do, but a feature flag gives us a cleaner and straight-forward way to enable the destination once testing is done. This is specially relevant given we'll have multiple PRs for this, so a feature flag helps coordinate everything. In the most likely scenario, the feature flag will be disabled until we the day flip it on for everyone, and then promptly delete it. This process doesn't take long though. I just purely like the coordination aspect of having a feature flag. |
cd011bf to
6396e15
Compare
|
Changes in latest commits:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 issues found across 7 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="products/batch_exports/backend/temporal/destinations/azure_blob_batch_export.py">
<violation number="1" location="products/batch_exports/backend/temporal/destinations/azure_blob_batch_export.py:190">
P3: `include_file_number` should be based on whether splitting is enabled (truthy `max_file_size_mb`), not just `is not None`; otherwise `max_file_size_mb=0` produces `-0` filenames without actually splitting.</violation>
<violation number="2" location="products/batch_exports/backend/temporal/destinations/azure_blob_batch_export.py:233">
P1: Compression validation should also enforce `SUPPORTED_COMPRESSIONS` for the chosen file format (and ideally normalize file format casing), otherwise some format/compression combinations can pass validation but fail later with a retryable exception.</violation>
</file>
<file name="products/batch_exports/backend/temporal/destinations/s3_batch_export.py">
<violation number="1" location="products/batch_exports/backend/temporal/destinations/s3_batch_export.py:307">
P1: Compression validation is too permissive: it checks only that the compression string exists in `COMPRESSION_EXTENSIONS`, not that it’s supported for the selected `file_format` (e.g., JSONLines+zstd will fail later with a retryable `ValueError`). Validate against `SUPPORTED_COMPRESSIONS[file_format]` (or equivalent) and raise `UnsupportedCompressionError` early.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
products/batch_exports/backend/temporal/destinations/azure_blob_batch_export.py
Outdated
Show resolved
Hide resolved
products/batch_exports/backend/temporal/destinations/azure_blob_batch_export.py
Show resolved
Hide resolved
| inputs.file_format, | ||
| inputs.compression, | ||
| use_new_file_naming_scheme=inputs.max_file_size_mb is not None, | ||
| def test_get_object_key(inputs, expected): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would probably make sense to move this to products/batch_exports/backend/tests/temporal/destinations/test_utils.py since the function being tested is now in utils.py
| 1. Ensure the development Docker stack is running (includes Azurite): | ||
|
|
||
| ```bash | ||
| docker compose -f docker-compose.dev.yml up -d |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think this needs updating now Azurite has been moving into its own docker compose file, maybe:
| docker compose -f docker-compose.dev.yml up -d | |
| docker compose -f docker-compose.dev.yml -f docker-compose.batch-exports.yml up -d |
|
@rossgray
Also waiting on feedback for the CI changes discussion: #43977 (comment) |
tomasfarias
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have anything more to add. I'll test this locally, update the migration, and get it merged. Great work here
rossgray
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy with the changes here too. Really good work, especially for a first PR 🎉
|
Great news, thanks a lot team! @tomasfarias @rossgray Thanks for the thorough reviews and quick feedback, I've learned a lot through this PR. I'm also super happy to add value for PostHog customers. I found PostHog when I was looking for a cheap way to measure analytics for my side project and can't believe that I might merge code here. In other news, the latest commit (502c498) includes the |
… into shared get_object_key function
- Move S3 key generation tests from s3/test_utils.py to shared test_utils.py - Update README to include batch-exports compose file in docker command
Start Azurite container in backend tests for Azure Blob Storage tests.
502c498 to
e6e6849
Compare
|
Rebased to bump migration numbers + add a few small changes:
Tested this locally. Both local tests (w/ Azurite) and tests with a real azure account are working. I've approved CI to run and will merge as soon as it's green. |
|
oof, lots of typing stuff is red, I'll take care of that... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 issue found across 7 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="products/batch_exports/backend/temporal/destinations/azure_blob_batch_export.py">
<violation number="1" location="products/batch_exports/backend/temporal/destinations/azure_blob_batch_export.py:354">
P2: The `integration_id` validation occurs after `start_batch_export_run` has already executed. If `integration_id` is `None`, a batch export run record will be created but the workflow will immediately fail, potentially leaving an orphaned run record. Consider moving this validation before `start_batch_export_run` is called.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| except OverBillingLimitError: | ||
| return | ||
|
|
||
| if inputs.integration_id is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: The integration_id validation occurs after start_batch_export_run has already executed. If integration_id is None, a batch export run record will be created but the workflow will immediately fail, potentially leaving an orphaned run record. Consider moving this validation before start_batch_export_run is called.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At products/batch_exports/backend/temporal/destinations/azure_blob_batch_export.py, line 354:
<comment>The `integration_id` validation occurs after `start_batch_export_run` has already executed. If `integration_id` is `None`, a batch export run record will be created but the workflow will immediately fail, potentially leaving an orphaned run record. Consider moving this validation before `start_batch_export_run` is called.</comment>
<file context>
@@ -348,6 +351,9 @@ async def run(self, inputs: AzureBlobBatchExportInputs):
except OverBillingLimitError:
return
+ if inputs.integration_id is None:
+ raise AzureBlobIntegrationNotFoundError(inputs.integration_id, inputs.team_id)
+
</file context>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, I don't think I will
|
Should have fixed all Python typing in latest commit. Frontend CI will fail as Azure is missing an Icon. I'll add that too. |
f24816e to
e1ca51a
Compare
|
I need to open my own version of this PR to get one of our bots to do its magic. Will cherry pick commits here. |
|
@tomasfarias Let me know if I can help with the failing tests or if you need me to push any fixes 🙂 |
|
@buildwithmalik All good, just need our bots to do their thing. I will get this merged today. |
|
@buildwithmalik aaand shipped! Thanks for your contribution! Let me know if you wish to take on the last frontend bits to enable this, otherwise we can also pick it up from here. You have already done most of the work. The plan is to turn the feature flag on for everyone very quickly after the frontend work is finished. |
Yayy! 🎉
Yup! I have some of the front-end changes ready, I should have a PR out soon! 🙂 |
Problem
Customers using Azure infrastructure currently can't export PostHog data directly to Azure Blob Storage. This adds Azure Blob as a new batch export destination.
Addresses #41383
This is PR 1 of 2 (backend only). Frontend changes will be in a follow-up PR. Gated behind a feature flag until frontend is ready.
Changes
Adds
AzureBlobas a new batch export destination type with a Temporal workflow (azure-blob-export) that uses the existing internal S3 staging pipeline, so no new ClickHouse query logic needed.Credentials are stored via a new
AzureBlobIntegrationclass (connection string auth). Added Azurite emulator to docker-compose for local dev.Test suite covers end-to-end tests for format/compression combinations, error handling, and file splitting.
How did you test this code?
Manual testing: Ran full workflow with Temporal workers, created exports via Django shell, verified blobs appear in Azurite with correct format/compression.
Test Coverage
test_activity_data_export.pytest_workflow_execution.pytest_workflow_error_states.pytest_workflow_with_azure_account.pytest_utils.pyParametrized combinations tested:
Changelog: Is this feature complete?
No - backend only. Frontend changes coming in a fast-follow PR.