Skip to content

Commit 3093d9f

Browse files
alexluongleggetter
andauthored
feat: Support GCP PubSub for publishmq & internalmq (#327)
* feat: GCP PubSub integration for publishmq * chore: Remove unused code * feat: GCP PubSub infra for internal mqs * chore: GCP PubSub .env.example * chore: Rename GCP PubSub publish helper * chore: Rename files to specify full service name * chore(docs): Update for GCP Pub/Sub internal and publish queue * feat(examples): GCP publish example --------- Co-authored-by: Phil Leggetter <phil@leggetter.co.uk>
1 parent c0715ce commit 3093d9f

39 files changed

+3628
-593
lines changed

.env.example

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
# ============================== Dev ==============================
2+
# GCP PubSub Emulator
3+
# PUBSUB_EMULATOR_HOST="gcp:8085"
4+
5+
# ============================== Outpost ==============================
6+
17
# API
28
PORT="3333"
39
API_PORT="3333"
@@ -26,21 +32,29 @@ POSTGRES_URL="postgres://outpost:outpost@postgres:5432/outpost?sslmode=disable"
2632
# ============================== Infra MQ ==============================
2733
# MQs, Uncomment the one you want to use
2834

29-
## AWS SQS
35+
## RabbitMQ
36+
RABBITMQ_SERVER_URL="amqp://guest:guest@rabbitmq:5672"
37+
RABBITMQ_EXCHANGE="outpost"
38+
RABBITMQ_DELIVERY_QUEUE="outpost-delivery"
39+
RABBITMQ_LOG_QUEUE="outpost-log"
3040

41+
## AWS SQS
3142
# AWS_SQS_ENDPOINT="http://aws:4566"
3243
# AWS_SQS_REGION="eu-central-1"
3344
# AWS_SQS_ACCESS_KEY_ID="test"
3445
# AWS_SQS_SECRET_ACCESS_KEY="test"
3546
# AWS_SQS_DELIVERY_QUEUE="outpost-delivery" # optional
3647
# AWS_SQS_LOG_QUEUE="outpost-log" # optional
3748

38-
## RabbitMQ
49+
## GCP PubSub
50+
# GCP_PUBSUB_PROJECT="test"
51+
# GCP_PUBSUB_SERVICE_ACCOUNT_CREDENTIALS=""
52+
# GCP_PUBSUB_DELIVERY_TOPIC="outpost-delivery"
53+
# GCP_PUBSUB_DELIVERY_SUBSCRIPTION="outpost-delivery-sub"
54+
# GCP_PUBSUB_LOG_TOPIC="outpost-log"
55+
# GCP_PUBSUB_LOG_SUBSCRIPTION="outpost-log-sub"
56+
3957

40-
RABBITMQ_SERVER_URL="amqp://guest:guest@rabbitmq:5672"
41-
RABBITMQ_EXCHANGE="outpost"
42-
RABBITMQ_DELIVERY_QUEUE="outpost-delivery"
43-
RABBITMQ_LOG_QUEUE="outpost-log"
4458

4559
# ============================== PublishMQ ==============================
4660

@@ -56,6 +70,13 @@ RABBITMQ_LOG_QUEUE="outpost-log"
5670
# PUBLISH_AWS_SQS_SECRET_ACCESS_KEY="test"
5771
# PUBLISH_AWS_SQS_QUEUE="publish_sqs_queue"
5872

73+
## GCP PubSub
74+
# PUBLISH_GCP_PUBSUB_PROJECT="test"
75+
# PUBLISH_GCP_PUBSUB_SERVICE_ACCOUNT_CREDENTIALS=""
76+
# PUBLISH_GCP_PUBSUB_TOPIC="outpost-publish"
77+
# PUBLISH_GCP_PUBSUB_SUBSCRIPTION="outpost-publish-sub"
78+
79+
5980
# ============================== Others ==============================
6081

6182
# Portal

.env.test

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
TEST_POSTGRES_URL="localhost:35432"
22
TEST_CLICKHOUSE_URL="localhost:39000"
33
TEST_LOCALSTACK_URL="localhost:34566"
4+
TEST_GCP_URL="localhost:38085"
45
TEST_RABBITMQ_URL="localhost:35672"
56
TEST_MOCKSERVER_URL="localhost:35555"

build/dev/deps/compose.yml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,24 @@ services:
7878
volumes:
7979
- aws:/var/lib/localstack
8080

81+
gcp:
82+
image: google/cloud-sdk:latest
83+
command:
84+
[
85+
"gcloud",
86+
"beta",
87+
"emulators",
88+
"pubsub",
89+
"start",
90+
"--host-port=0.0.0.0:8085",
91+
"--project=test",
92+
]
93+
ports:
94+
- "8085:8085"
95+
volumes:
96+
- gcp-config:/root/.config
97+
- gcp-kube:/root/.kube
98+
8199
volumes:
82100
redis:
83101
driver: local
@@ -89,6 +107,10 @@ volumes:
89107
driver: local
90108
aws:
91109
driver: local
110+
gcp-config:
111+
driver: local
112+
gcp-kube:
113+
driver: local
92114

93115
networks:
94116
default:

build/test/compose.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,17 @@ services:
3232
ports:
3333
- 34566:4566
3434
- 34571:4571
35+
gcp:
36+
image: google/cloud-sdk:latest
37+
command:
38+
[
39+
"gcloud",
40+
"beta",
41+
"emulators",
42+
"pubsub",
43+
"start",
44+
"--host-port=0.0.0.0:8085",
45+
"--project=test",
46+
]
47+
ports:
48+
- "38085:8085"

cmd/publish/declare_handler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ func handleDeclare(w http.ResponseWriter, r *http.Request) {
1010
switch r.URL.Query().Get("method") {
1111
case "aws_sqs":
1212
err = declareAWS()
13+
case "gcp_pubsub":
14+
err = declareGCP()
1315
case "rabbitmq":
1416
err = declareRabbitMQ()
1517
case "http":

cmd/publish/publish_gcp.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"log"
7+
8+
"cloud.google.com/go/pubsub"
9+
"google.golang.org/api/option"
10+
"google.golang.org/grpc"
11+
"google.golang.org/grpc/credentials/insecure"
12+
)
13+
14+
const (
15+
GCPEndpoint = "localhost:8085"
16+
GCPProjectID = "test"
17+
GCPPublishTopicName = "outpost-publish"
18+
GCPPublishSubName = "outpost-publish-sub"
19+
)
20+
21+
func publishGCP(body map[string]interface{}) error {
22+
log.Printf("[x] Publishing GCP PubSub")
23+
24+
ctx := context.Background()
25+
client, err := getGCPClient(ctx)
26+
if err != nil {
27+
return err
28+
}
29+
defer client.Close()
30+
31+
topic := client.Topic(GCPPublishTopicName)
32+
exists, err := topic.Exists(ctx)
33+
if err != nil {
34+
return err
35+
}
36+
if !exists {
37+
log.Printf("[!] Topic %s does not exist. Creating it first.", GCPPublishTopicName)
38+
if err := declareGCP(); err != nil {
39+
return err
40+
}
41+
topic = client.Topic(GCPPublishTopicName)
42+
}
43+
44+
messageBody, err := json.Marshal(body)
45+
if err != nil {
46+
return err
47+
}
48+
49+
result := topic.Publish(ctx, &pubsub.Message{
50+
Data: messageBody,
51+
Attributes: map[string]string{
52+
"source": "outpost-publish",
53+
},
54+
})
55+
56+
_, err = result.Get(ctx)
57+
if err != nil {
58+
return err
59+
}
60+
61+
log.Printf("[x] Published message to GCP PubSub topic %s", GCPPublishTopicName)
62+
return nil
63+
}
64+
65+
func declareGCP() error {
66+
log.Printf("[*] Declaring GCP Publish infra")
67+
ctx := context.Background()
68+
client, err := getGCPClient(ctx)
69+
if err != nil {
70+
return err
71+
}
72+
defer client.Close()
73+
74+
// Create topic if it doesn't exist
75+
topic, err := client.CreateTopic(ctx, GCPPublishTopicName)
76+
if err != nil {
77+
if err.Error() == "rpc error: code = AlreadyExists desc = Topic already exists" {
78+
log.Printf("[*] Topic %s already exists", GCPPublishTopicName)
79+
} else {
80+
return err
81+
}
82+
} else {
83+
log.Printf("[*] Topic %s created successfully", topic.ID())
84+
}
85+
86+
// Create subscription if it doesn't exist
87+
_, err = client.CreateSubscription(ctx, GCPPublishSubName, pubsub.SubscriptionConfig{
88+
Topic: topic,
89+
})
90+
if err != nil {
91+
if err.Error() == "rpc error: code = AlreadyExists desc = Subscription already exists" {
92+
log.Printf("[*] Subscription %s already exists", GCPPublishSubName)
93+
} else {
94+
return err
95+
}
96+
} else {
97+
log.Printf("[*] Subscription %s created successfully", GCPPublishSubName)
98+
}
99+
100+
return nil
101+
}
102+
103+
func getGCPClient(ctx context.Context) (*pubsub.Client, error) {
104+
client, err := pubsub.NewClient(
105+
ctx,
106+
GCPProjectID,
107+
option.WithEndpoint(GCPEndpoint),
108+
option.WithoutAuthentication(),
109+
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
110+
)
111+
if err != nil {
112+
return nil, err
113+
}
114+
return client, nil
115+
}

cmd/publish/publish_handler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ func handlePublish(w http.ResponseWriter, r *http.Request) {
1717
switch r.URL.Query().Get("method") {
1818
case "aws_sqs":
1919
err = publishAWS(body)
20+
case "gcp_pubsub":
21+
err = publishGCP(body)
2022
case "rabbitmq":
2123
err = publishRabbitMQ(body)
2224
case "http":

cmd/publish/publish_rabbitmq.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ func publishRabbitMQ(body map[string]interface{}) error {
1717
log.Printf("[x] Publishing RabbitMQ")
1818

1919
conn, err := amqp091.Dial(RabbitMQServerURL)
20-
defer conn.Close()
2120
if err != nil {
2221
return err
2322
}
23+
defer conn.Close()
2424
ch, err := conn.Channel()
25-
defer ch.Close()
2625
if err != nil {
2726
return err
2827
}
28+
defer ch.Close()
2929

3030
messageBody, err := json.Marshal(body)
3131
if err != nil {

docs/pages/guides/deployment.mdx

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ Outpost requires the following dependencies:
4242

4343
- Redis
4444
- PostgreSQL
45-
- A log queue such as RabbitMQ or AWS SQS
46-
- A delivery queue such as RabbitMQ or AWS SQS
47-
- Optionally, a publish queue such as RabbitMQ or AWS SQS
45+
- Using RabbitMQ, AWS SQS, or GCP Pub/Sub for the following queues:
46+
- Log queue
47+
- Delivery queue
48+
- Publish queue (optional)
4849

4950
Each of these will need to be provisioned and allocated sufficient resources based on expected usage and load.
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
---
2+
title: "Publish from GCP Pub/Sub"
3+
---
4+
5+
This guide provides information on using GCP Pub/Sub to publish events to Outpost.
6+
7+
## Message Structure
8+
9+
GCP Pub/Sub messages should have the same payload structure as the [Publish API endpoint](/docs/references/api#publish).
10+
11+
```json
12+
{
13+
"tenant_id": "<TENANT_ID>",
14+
"destination_id": "<DESTINATION_ID>", // Optional. Provide a way of routing events to a specific destination
15+
"topic": "topic.name", // Topic defined in TOPICS environment variable
16+
"eligible_for_retry": true | false, // Should event delivery be retried. Default is true.
17+
"metadata": Payload, // can by any JSON payload,
18+
"data": Payload // can by any JSON payload
19+
}
20+
```
21+
22+
## GCP Pub/Sub Setup
23+
24+
[Create a topic](https://cloud.google.com/pubsub/docs/create-topic) and [create a pull subscription](https://cloud.google.com/pubsub/docs/create-subscription) within GCP Pub/Sub.
25+
26+
Once you have these in place, you can configure Outpost to subscribe to receive events and then publish them to subscribed Outpost Destinations.
27+
28+
## Configuration
29+
30+
Provide Outpost with credentials, topic, and subscription for your GCP Pub/Sub instance used for publishing events.
31+
32+
### Environment Variables
33+
34+
```
35+
PUBLISH_GCP_PUBSUB_PROJECT=""
36+
PUBLISH_GCP_PUBSUB_SERVICE_ACCOUNT_CREDENTIALS=""
37+
PUBLISH_GCP_PUBSUB_TOPIC=""
38+
PUBLISH_GCP_PUBSUB_SUBSCRIPTION=""
39+
```
40+
41+
#### Example
42+
43+
```
44+
PUBLISH_GCP_PUBSUB_PROJECT="my-gcp-project"
45+
PUBLISH_GCP_PUBSUB_SERVICE_ACCOUNT_CREDENTIALS="single_line_json_string"
46+
PUBLISH_GCP_PUBSUB_TOPIC="outpost-publish"
47+
PUBLISH_GCP_PUBSUB_SUBSCRIPTION="outpost-publish-sub"
48+
```
49+
50+
### YAML
51+
52+
```yaml
53+
publishmq:
54+
gcp_pubsub:
55+
project: <GCP_PROJECT>
56+
service_account_credentials: <GCP_SERVICE_ACCOUNT_CREDENTIALS>
57+
topic: <GCP_PUBSUB_TOPIC>
58+
subscription: <GCP_PUBSUB_SUBSCRIPTION>
59+
```
60+
61+
#### Example
62+
63+
```yaml
64+
publishmq:
65+
gcp_pubsub:
66+
project: "GCP_PUBSUB_PROJECT" # GCP project ID
67+
service_account_credentials: "GCP_PUBSUB_SERVICE_ACCOUNT_CREDENTIALS" # Contents of service account credentials JSON file
68+
topic: "outpost-publish" # Pub/Sub to read for published events
69+
subscription: "outpost-publish-sub" # Pub/Sub subscription for published events
70+
```
71+
72+
### Troubleshooting
73+
74+
- [Ask a question](https://github.com/hookdeck/outpost/discussions/new?category=q-a)
75+
- [Report a bug](https://github.com/hookdeck/outpost/issues/new?assignees=&labels=bug&projects=&template=bug_report.md&title=%F0%9F%90%9B+Bug+Report%3A+)
76+
- [Request a feature](https://github.com/hookdeck/outpost/issues/new?assignees=&labels=enhancement&projects=&template=feature_request.md&title=%F0%9F%9A%80+Feature%3A+)

0 commit comments

Comments
 (0)