Skip to content

feat: Support GCP PubSub for publishmq & internalmq #327

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# ============================== Dev ==============================
# GCP PubSub Emulator
# PUBSUB_EMULATOR_HOST="gcp:8085"

# ============================== Outpost ==============================

# API
PORT="3333"
API_PORT="3333"
Expand Down Expand Up @@ -26,21 +32,29 @@ POSTGRES_URL="postgres://outpost:outpost@postgres:5432/outpost?sslmode=disable"
# ============================== Infra MQ ==============================
# MQs, Uncomment the one you want to use

## AWS SQS
## RabbitMQ
RABBITMQ_SERVER_URL="amqp://guest:guest@rabbitmq:5672"
RABBITMQ_EXCHANGE="outpost"
RABBITMQ_DELIVERY_QUEUE="outpost-delivery"
RABBITMQ_LOG_QUEUE="outpost-log"

## AWS SQS
# AWS_SQS_ENDPOINT="http://aws:4566"
# AWS_SQS_REGION="eu-central-1"
# AWS_SQS_ACCESS_KEY_ID="test"
# AWS_SQS_SECRET_ACCESS_KEY="test"
# AWS_SQS_DELIVERY_QUEUE="outpost-delivery" # optional
# AWS_SQS_LOG_QUEUE="outpost-log" # optional

## RabbitMQ
## GCP PubSub
# GCP_PUBSUB_PROJECT="test"
# GCP_PUBSUB_SERVICE_ACCOUNT_CREDENTIALS=""
# GCP_PUBSUB_DELIVERY_TOPIC="outpost-delivery"
# GCP_PUBSUB_DELIVERY_SUBSCRIPTION="outpost-delivery-sub"
# GCP_PUBSUB_LOG_TOPIC="outpost-log"
# GCP_PUBSUB_LOG_SUBSCRIPTION="outpost-log-sub"


RABBITMQ_SERVER_URL="amqp://guest:guest@rabbitmq:5672"
RABBITMQ_EXCHANGE="outpost"
RABBITMQ_DELIVERY_QUEUE="outpost-delivery"
RABBITMQ_LOG_QUEUE="outpost-log"

# ============================== PublishMQ ==============================

Expand All @@ -56,6 +70,13 @@ RABBITMQ_LOG_QUEUE="outpost-log"
# PUBLISH_AWS_SQS_SECRET_ACCESS_KEY="test"
# PUBLISH_AWS_SQS_QUEUE="publish_sqs_queue"

## GCP PubSub
# PUBLISH_GCP_PUBSUB_PROJECT="test"
# PUBLISH_GCP_PUBSUB_SERVICE_ACCOUNT_CREDENTIALS=""
# PUBLISH_GCP_PUBSUB_TOPIC="outpost-publish"
# PUBLISH_GCP_PUBSUB_SUBSCRIPTION="outpost-publish-sub"


# ============================== Others ==============================

# Portal
Expand Down
1 change: 1 addition & 0 deletions .env.test
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
TEST_POSTGRES_URL="localhost:35432"
TEST_CLICKHOUSE_URL="localhost:39000"
TEST_LOCALSTACK_URL="localhost:34566"
TEST_GCP_URL="localhost:38085"
TEST_RABBITMQ_URL="localhost:35672"
TEST_MOCKSERVER_URL="localhost:35555"
22 changes: 22 additions & 0 deletions build/dev/deps/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,24 @@ services:
volumes:
- aws:/var/lib/localstack

gcp:
image: google/cloud-sdk:latest
command:
[
"gcloud",
"beta",
"emulators",
"pubsub",
"start",
"--host-port=0.0.0.0:8085",
"--project=test",
]
ports:
- "8085:8085"
volumes:
- gcp-config:/root/.config
- gcp-kube:/root/.kube

volumes:
redis:
driver: local
Expand All @@ -89,6 +107,10 @@ volumes:
driver: local
aws:
driver: local
gcp-config:
driver: local
gcp-kube:
driver: local

networks:
default:
Expand Down
14 changes: 14 additions & 0 deletions build/test/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,17 @@ services:
ports:
- 34566:4566
- 34571:4571
gcp:
image: google/cloud-sdk:latest
command:
[
"gcloud",
"beta",
"emulators",
"pubsub",
"start",
"--host-port=0.0.0.0:8085",
"--project=test",
]
ports:
- "38085:8085"
2 changes: 2 additions & 0 deletions cmd/publish/declare_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ func handleDeclare(w http.ResponseWriter, r *http.Request) {
switch r.URL.Query().Get("method") {
case "aws_sqs":
err = declareAWS()
case "gcp_pubsub":
err = declareGCP()
case "rabbitmq":
err = declareRabbitMQ()
case "http":
Expand Down
115 changes: 115 additions & 0 deletions cmd/publish/publish_gcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package main

import (
"context"
"encoding/json"
"log"

"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
GCPEndpoint = "localhost:8085"
GCPProjectID = "test"
GCPPublishTopicName = "outpost-publish"
GCPPublishSubName = "outpost-publish-sub"
)

func publishGCP(body map[string]interface{}) error {
log.Printf("[x] Publishing GCP PubSub")

ctx := context.Background()
client, err := getGCPClient(ctx)
if err != nil {
return err
}
defer client.Close()

topic := client.Topic(GCPPublishTopicName)
exists, err := topic.Exists(ctx)
if err != nil {
return err
}
if !exists {
log.Printf("[!] Topic %s does not exist. Creating it first.", GCPPublishTopicName)
if err := declareGCP(); err != nil {
return err
}
topic = client.Topic(GCPPublishTopicName)
}

messageBody, err := json.Marshal(body)
if err != nil {
return err
}

result := topic.Publish(ctx, &pubsub.Message{
Data: messageBody,
Attributes: map[string]string{
"source": "outpost-publish",
},
})

_, err = result.Get(ctx)
if err != nil {
return err
}

log.Printf("[x] Published message to GCP PubSub topic %s", GCPPublishTopicName)
return nil
}

func declareGCP() error {
log.Printf("[*] Declaring GCP Publish infra")
ctx := context.Background()
client, err := getGCPClient(ctx)
if err != nil {
return err
}
defer client.Close()

// Create topic if it doesn't exist
topic, err := client.CreateTopic(ctx, GCPPublishTopicName)
if err != nil {
if err.Error() == "rpc error: code = AlreadyExists desc = Topic already exists" {
log.Printf("[*] Topic %s already exists", GCPPublishTopicName)
} else {
return err
}
} else {
log.Printf("[*] Topic %s created successfully", topic.ID())
}

// Create subscription if it doesn't exist
_, err = client.CreateSubscription(ctx, GCPPublishSubName, pubsub.SubscriptionConfig{
Topic: topic,
})
if err != nil {
if err.Error() == "rpc error: code = AlreadyExists desc = Subscription already exists" {
log.Printf("[*] Subscription %s already exists", GCPPublishSubName)
} else {
return err
}
} else {
log.Printf("[*] Subscription %s created successfully", GCPPublishSubName)
}

return nil
}

func getGCPClient(ctx context.Context) (*pubsub.Client, error) {
client, err := pubsub.NewClient(
ctx,
GCPProjectID,
option.WithEndpoint(GCPEndpoint),
option.WithoutAuthentication(),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
)
if err != nil {
return nil, err
}
return client, nil
}
2 changes: 2 additions & 0 deletions cmd/publish/publish_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ func handlePublish(w http.ResponseWriter, r *http.Request) {
switch r.URL.Query().Get("method") {
case "aws_sqs":
err = publishAWS(body)
case "gcp_pubsub":
err = publishGCP(body)
case "rabbitmq":
err = publishRabbitMQ(body)
case "http":
Expand Down
4 changes: 2 additions & 2 deletions cmd/publish/publish_rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ func publishRabbitMQ(body map[string]interface{}) error {
log.Printf("[x] Publishing RabbitMQ")

conn, err := amqp091.Dial(RabbitMQServerURL)
defer conn.Close()
if err != nil {
return err
}
defer conn.Close()
ch, err := conn.Channel()
defer ch.Close()
if err != nil {
return err
}
defer ch.Close()

messageBody, err := json.Marshal(body)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions docs/pages/guides/deployment.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ Outpost requires the following dependencies:

- Redis
- PostgreSQL
- A log queue such as RabbitMQ or AWS SQS
- A delivery queue such as RabbitMQ or AWS SQS
- Optionally, a publish queue such as RabbitMQ or AWS SQS
- Using RabbitMQ, AWS SQS, or GCP Pub/Sub for the following queues:
- Log queue
- Delivery queue
- Publish queue (optional)

Each of these will need to be provisioned and allocated sufficient resources based on expected usage and load.
76 changes: 76 additions & 0 deletions docs/pages/guides/publish-from-gcp-pubsub.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
---
title: "Publish from GCP Pub/Sub"
---

This guide provides information on using GCP Pub/Sub to publish events to Outpost.

## Message Structure

GCP Pub/Sub messages should have the same payload structure as the [Publish API endpoint](/docs/references/api#publish).

```json
{
"tenant_id": "<TENANT_ID>",
"destination_id": "<DESTINATION_ID>", // Optional. Provide a way of routing events to a specific destination
"topic": "topic.name", // Topic defined in TOPICS environment variable
"eligible_for_retry": true | false, // Should event delivery be retried. Default is true.
"metadata": Payload, // can by any JSON payload,
"data": Payload // can by any JSON payload
}
```

## GCP Pub/Sub Setup

[Create a topic](https://cloud.google.com/pubsub/docs/create-topic) and [create a pull subscription](https://cloud.google.com/pubsub/docs/create-subscription) within GCP Pub/Sub.

Once you have these in place, you can configure Outpost to subscribe to receive events and then publish them to subscribed Outpost Destinations.

## Configuration

Provide Outpost with credentials, topic, and subscription for your GCP Pub/Sub instance used for publishing events.

### Environment Variables

```
PUBLISH_GCP_PUBSUB_PROJECT=""
PUBLISH_GCP_PUBSUB_SERVICE_ACCOUNT_CREDENTIALS=""
PUBLISH_GCP_PUBSUB_TOPIC=""
PUBLISH_GCP_PUBSUB_SUBSCRIPTION=""
```

#### Example

```
PUBLISH_GCP_PUBSUB_PROJECT="my-gcp-project"
PUBLISH_GCP_PUBSUB_SERVICE_ACCOUNT_CREDENTIALS="single_line_json_string"
PUBLISH_GCP_PUBSUB_TOPIC="outpost-publish"
PUBLISH_GCP_PUBSUB_SUBSCRIPTION="outpost-publish-sub"
```

### YAML

```yaml
publishmq:
gcp_pubsub:
project: <GCP_PROJECT>
service_account_credentials: <GCP_SERVICE_ACCOUNT_CREDENTIALS>
topic: <GCP_PUBSUB_TOPIC>
subscription: <GCP_PUBSUB_SUBSCRIPTION>
```

#### Example

```yaml
publishmq:
gcp_pubsub:
project: "GCP_PUBSUB_PROJECT" # GCP project ID
service_account_credentials: "GCP_PUBSUB_SERVICE_ACCOUNT_CREDENTIALS" # Contents of service account credentials JSON file
topic: "outpost-publish" # Pub/Sub to read for published events
subscription: "outpost-publish-sub" # Pub/Sub subscription for published events
```

### Troubleshooting

- [Ask a question](https://github.com/hookdeck/outpost/discussions/new?category=q-a)
- [Report a bug](https://github.com/hookdeck/outpost/issues/new?assignees=&labels=bug&projects=&template=bug_report.md&title=%F0%9F%90%9B+Bug+Report%3A+)
- [Request a feature](https://github.com/hookdeck/outpost/issues/new?assignees=&labels=enhancement&projects=&template=feature_request.md&title=%F0%9F%9A%80+Feature%3A+)
4 changes: 2 additions & 2 deletions docs/pages/guides/publish-from-rabbitmq.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ RabbitMQ messages should have the same payload structure as the [Publish API end
"tenant_id": "<TENANT_ID>",
"destination_id": "<DESTINATION_ID>", // Optional. Provide a way of routing events to a specific destination
"topic": "topic.name", // Topic defined in TOPICS environment variable
"eligible_for_retry": false | true, // Should event delivery be retried. Default is false.
"eligible_for_retry": true | false, // Should event delivery be retried. Default is true.
"metadata": Payload, // can by any JSON payload,
"data": Payload // can by any JSON payload
}
Expand Down Expand Up @@ -42,7 +42,7 @@ PUBLISH_RABBITMQ_QUEUE="publish"
### YAML

```yaml
mqs:
publishmq:
publishmq:
rabbitmq:
server_url: <SERVER_URL>
Expand Down
Loading