Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci-backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -683,8 +683,8 @@ jobs:
while [ $attempt -le $max_attempts ]; do
echo "Attempt $attempt of $max_attempts to start stack..."

if docker compose -f docker-compose.dev.yml down && \
docker compose -f docker-compose.dev.yml up -d; then
if docker compose -f docker-compose.dev.yml -f products/batch_exports/backend/tests/docker-compose.yml down && \
docker compose -f docker-compose.dev.yml -f products/batch_exports/backend/tests/docker-compose.yml up -d; then
echo "Stack started successfully"
exit 0
fi
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions frontend/src/generated/core/api.schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ export interface CreateGroupApi {
* `clickup` - Clickup
* `vercel` - Vercel
* `databricks` - Databricks
* `azure-blob` - Azure Blob
*/
export type KindEnumApi = (typeof KindEnumApi)[keyof typeof KindEnumApi]

Expand Down Expand Up @@ -508,6 +509,7 @@ export const KindEnumApi = {
clickup: 'clickup',
vercel: 'vercel',
databricks: 'databricks',
'azure-blob': 'azure-blob',
} as const

/**
Expand Down
1 change: 1 addition & 0 deletions frontend/src/lib/constants.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ export const FEATURE_FLAGS = {
AI_SESSION_SUMMARY: 'ai-session-summary', // owner: #team-replay
AMPLITUDE_BATCH_IMPORT_OPTIONS: 'amplitude-batch-import-options', // owner: #team-ingestion
BATCH_EXPORT_NEW_LOGS: 'batch-export-new-logs', // owner: #team-batch-exports
BATCH_EXPORTS_AZURE_BLOB: 'azure-blob-batch-exports', // owner: #team-batch-exports
BATCH_EXPORTS_DATABRICKS: 'databricks-batch-exports', // owner: @rossgray #team-batch-exports
BACKFILL_WORKFLOWS_DESTINATION: 'backfill-workflows-destination', // owner: #team-batch-exports
BING_ADS_SOURCE: 'bing-ads-source', // owner: @jabahamondes #team-web-analytics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { BatchExportService } from '~/types'
import IconWorkflows from 'public/hedgehog/mail-hog.png'
import IconHTTP from 'public/hedgehog/running-hog.png'
import IconS3 from 'public/services/aws-s3.png'
import IconAzureBlob from 'public/services/azure.png'
import IconBigQuery from 'public/services/bigquery.png'
import IconDatabricks from 'public/services/databricks.png'
import IconPostgres from 'public/services/postgres.png'
Expand All @@ -16,6 +17,7 @@ export function getBatchExportUrl(service: BatchExportService['type']): string {
}

export const BATCH_EXPORT_ICON_MAP: Record<BatchExportService['type'], string> = {
AzureBlob: IconAzureBlob,
BigQuery: IconBigQuery,
Postgres: IconPostgres,
Redshift: IconRedshift,
Expand Down
16 changes: 16 additions & 0 deletions frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5335,6 +5335,20 @@ export type BatchExportServiceDatabricks = {
}
}

export type BatchExportServiceAzureBlob = {
type: 'AzureBlob'
integration: number
config: {
container_name: string
prefix: string
compression: string | null
file_format: string
max_file_size_mb: number | null
exclude_events: string[]
include_events: string[]
}
}

export type BatchExportRealtimeDestinationBackfill = {
type: 'Workflows'
config: {}
Expand All @@ -5351,6 +5365,7 @@ export const BATCH_EXPORT_SERVICE_NAMES: BatchExportService['type'][] = [
'Redshift',
'HTTP',
'Databricks',
'AzureBlob',
'Workflows',
]
export type BatchExportService =
Expand All @@ -5361,6 +5376,7 @@ export type BatchExportService =
| BatchExportServiceRedshift
| BatchExportServiceHTTP
| BatchExportServiceDatabricks
| BatchExportServiceAzureBlob
| BatchExportRealtimeDestinationBackfill

export type PipelineInterval = 'hour' | 'day' | 'every 5 minutes'
Expand Down
96 changes: 96 additions & 0 deletions posthog/api/test/batch_exports/test_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,102 @@ def test_creating_databricks_batch_export_fails_if_integration_is_not_the_correc
assert response.json()["detail"] == "Integration is not a Databricks integration."


def test_creating_azure_blob_batch_export_fails_if_feature_flag_is_not_enabled(
client: HttpClient,
team,
user,
):
"""Test that creating an Azure Blob batch export fails if the feature flag is not enabled."""
destination_data = {
"type": "AzureBlob",
"config": {
"container_name": "test-container",
},
}

batch_export_data = {
"name": "my-azure-blob-destination",
"destination": destination_data,
"interval": "hour",
}

client.force_login(user)

with mock.patch(
"posthog.batch_exports.http.posthoganalytics.feature_enabled",
return_value=False,
):
response = create_batch_export(
client,
team.pk,
batch_export_data,
)

assert response.status_code == status.HTTP_403_FORBIDDEN, response.json()
assert "Azure Blob Storage batch exports are not enabled for this team." in response.json()["detail"]


@pytest.fixture
def azure_blob_integration(team, user):
"""Create an Azure Blob integration."""
return Integration.objects.create(
team=team,
kind=Integration.IntegrationKind.AZURE_BLOB,
integration_id="my-storage-account",
config={},
sensitive_config={
"connection_string": "DefaultEndpointsProtocol=https;AccountName=my-storage-account;AccountKey=my-key;EndpointSuffix=core.windows.net"
},
created_by=user,
)


def test_creating_azure_blob_batch_export_using_integration(
client: HttpClient, temporal, organization, team, user, azure_blob_integration
):
"""Test that we can create an Azure Blob batch export using an integration."""
destination_data = {
"type": "AzureBlob",
"config": {
"container_name": "test-container",
"prefix": "test-prefix/",
},
"integration": azure_blob_integration.id,
}

batch_export_data = {
"name": "my-azure-blob-destination",
"destination": destination_data,
"interval": "hour",
}

client.force_login(user)

with mock.patch(
"posthog.batch_exports.http.posthoganalytics.feature_enabled",
return_value=True,
):
response = create_batch_export(
client,
team.pk,
batch_export_data,
)

assert response.status_code == status.HTTP_201_CREATED, response.json()

data = response.json()
assert data["destination"]["type"] == "AzureBlob"
assert data["destination"]["config"]["container_name"] == "test-container"
assert data["destination"]["config"]["prefix"] == "test-prefix/"
assert data["interval"] == "hour"

temporal_schedule = describe_schedule(temporal, data["id"])
assert temporal_schedule is not None
assert temporal_schedule.schedule is not None
assert isinstance(temporal_schedule.schedule.action, ScheduleActionStartWorkflow)
assert temporal_schedule.schedule.action.workflow == "azure-blob-export"


@pytest.mark.parametrize(
"model,expected_status,expected_error",
[
Expand Down
16 changes: 16 additions & 0 deletions posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,23 @@ def validate_destination(self, destination_attrs: dict):
DatabricksIntegration(integration)
except DatabricksIntegrationError as e:
raise serializers.ValidationError(str(e))
if destination_type == BatchExportDestination.Destination.AZURE_BLOB:
team_id = self.context["team_id"]
team = Team.objects.get(id=team_id)

if not posthoganalytics.feature_enabled(
"azure-blob-batch-exports",
str(team.uuid),
groups={"organization": str(team.organization.id)},
group_properties={
"organization": {
"id": str(team.organization.id),
"created_at": team.organization.created_at,
}
},
send_feature_flag_events=False,
):
raise PermissionDenied("Azure Blob Storage batch exports are not enabled for this team.")
if destination_type == BatchExportDestination.Destination.REDSHIFT:
config = destination_attrs["config"]
view = self.context.get("view")
Expand Down
5 changes: 3 additions & 2 deletions posthog/batch_exports/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Destination(models.TextChoices):
REDSHIFT = "Redshift"
BIGQUERY = "BigQuery"
DATABRICKS = "Databricks"
AZURE_BLOB = "AzureBlob"
WORKFLOWS = "Workflows"
HTTP = "HTTP"
NOOP = "NoOp"
Expand All @@ -40,8 +41,8 @@ class Destination(models.TextChoices):
"Postgres": {"user", "password"},
"Redshift": {"user", "password", "aws_access_key_id", "aws_secret_access_key"},
"BigQuery": {"private_key", "private_key_id", "client_email", "token_uri"},
# Databricks does not have any secret fields, as we use integrations to store credentials
"Databricks": set(),
"Databricks": set(), # uses Integration model to store credentials
"AzureBlob": set(), # uses Integration model to store credentials
"HTTP": {"token"},
"NoOp": set(),
"Workflows": set(),
Expand Down
20 changes: 20 additions & 0 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,25 @@ class DatabricksBatchExportInputs(BaseBatchExportInputs):
use_automatic_schema_evolution: bool = True


@dataclass(kw_only=True)
class AzureBlobBatchExportInputs(BaseBatchExportInputs):
"""Inputs for Azure Blob Storage export workflow.

NOTE: Connection credentials are stored in the Integration model.
The `integration_id` field from `BaseBatchExportInputs` is used to fetch them.
"""

container_name: str
prefix: str = ""
compression: str | None = None
file_format: str = "JSONLines"
max_file_size_mb: int | None = None

def __post_init__(self):
if self.max_file_size_mb:
self.max_file_size_mb = int(self.max_file_size_mb)


@dataclass(kw_only=True)
class WorkflowsBatchExportInputs(BaseBatchExportInputs):
"""Inputs for Workflows export workflow.
Expand Down Expand Up @@ -346,6 +365,7 @@ class NoOpInputs(BaseBatchExportInputs):
"Redshift": ("redshift-export", RedshiftBatchExportInputs),
"BigQuery": ("bigquery-export", BigQueryBatchExportInputs),
"Databricks": ("databricks-export", DatabricksBatchExportInputs),
"AzureBlob": ("azure-blob-export", AzureBlobBatchExportInputs),
"HTTP": ("http-export", HttpBatchExportInputs),
"NoOp": ("no-op", NoOpInputs),
"Workflows": ("workflows-export", WorkflowsBatchExportInputs),
Expand Down
64 changes: 64 additions & 0 deletions posthog/migrations/0963_add_azure_blob_destination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Generated by Django 4.2.26 on 2025-12-23 13:33

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("posthog", "0962_webanalyticsfilterpreset"),
]

operations = [
migrations.AlterField(
model_name="batchexportdestination",
name="type",
field=models.CharField(
choices=[
("S3", "S3"),
("Snowflake", "Snowflake"),
("Postgres", "Postgres"),
("Redshift", "Redshift"),
("BigQuery", "Bigquery"),
("Databricks", "Databricks"),
("AzureBlob", "Azure Blob"),
("Workflows", "Workflows"),
("HTTP", "Http"),
("NoOp", "Noop"),
],
help_text="A choice of supported BatchExportDestination types.",
max_length=64,
),
),
migrations.AlterField(
model_name="integration",
name="kind",
field=models.CharField(
choices=[
("slack", "Slack"),
("salesforce", "Salesforce"),
("hubspot", "Hubspot"),
("google-pubsub", "Google Pubsub"),
("google-cloud-storage", "Google Cloud Storage"),
("google-ads", "Google Ads"),
("google-sheets", "Google Sheets"),
("snapchat", "Snapchat"),
("linkedin-ads", "Linkedin Ads"),
("reddit-ads", "Reddit Ads"),
("tiktok-ads", "Tiktok Ads"),
("bing-ads", "Bing Ads"),
("intercom", "Intercom"),
("email", "Email"),
("linear", "Linear"),
("github", "Github"),
("gitlab", "Gitlab"),
("meta-ads", "Meta Ads"),
("twilio", "Twilio"),
("clickup", "Clickup"),
("vercel", "Vercel"),
("databricks", "Databricks"),
("azure-blob", "Azure Blob"),
],
max_length=20,
),
),
]
2 changes: 1 addition & 1 deletion posthog/migrations/max_migration.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0962_webanalyticsfilterpreset
0963_add_azure_blob_destination
Loading
Loading