Skip to content

AsyncAPI #1935

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 47 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
2f65747
WIP: async-gateway workload creation service
Mar 4, 2021
da65250
WIP: async-gateway workload creation endpoint
Mar 5, 2021
e5471fa
WIP: async-gateway result retrieval service implementation
Mar 5, 2021
ab6ae29
Working async-gateway
Mar 5, 2021
59ab15b
Merge branch 'master' into features/async
Mar 5, 2021
1b697b8
Update comments
Mar 5, 2021
27305e3
Improve logging
Mar 5, 2021
18987ea
Fix routes and add health route
Mar 5, 2021
8ba7704
Update health route
Mar 5, 2021
6273fcd
Add in_queue status to created workload
Mar 5, 2021
9156e74
Improve logging in async-gateway
Mar 8, 2021
29ac5df
Remove fmt.Sprintf statements from zap, use key/values instead
Mar 8, 2021
d3aa8ae
Added python code for AsyncAPI workloads
Mar 9, 2021
926d7dc
Improved python logging with extra fields
Mar 9, 2021
483711c
WIP: Add AsyncAPI to the cortex operator
Mar 10, 2021
909ea56
Merge branch 'master' into features/async
Mar 10, 2021
55b1385
WIP: Add delete and update functions for AsyncAPI
Mar 11, 2021
b5a693c
Add delete and update to operator endpoints for AsyncAPI
Mar 11, 2021
da29525
Merge branch 'master' into features/async
vishalbollu Mar 12, 2021
f9d26da
Merge branch 'master' into features/async
vishalbollu Mar 12, 2021
8771f2d
AsyncAPI progress
vishalbollu Mar 12, 2021
29bb756
Remove debug prints and fix async API update
Mar 12, 2021
a1570bb
Add newline to async/sample.json
Mar 12, 2021
d273dcd
Fix python code issues mentioned in the PR
Mar 12, 2021
102dd7f
Add deploymentID to async queue name
Mar 12, 2021
bb7d2b8
Add autoscaler cron to async api
Mar 12, 2021
ec36fb5
Merge branch 'master' into features/async
Mar 12, 2021
00121eb
Update async predictor
Mar 15, 2021
0ce539e
Set operator log level to debug in dev/operator_local.sh
Mar 15, 2021
9f7cdf0
Change async autoscaler to use prometheus
Mar 15, 2021
91a598c
Add service monitor for operator
Mar 15, 2021
a647e96
Delete payload from S3 on completion and failure
Mar 15, 2021
3776a71
Commit go.sum
Mar 15, 2021
6a9695e
Fix sqs queue name parsing
vishalbollu Mar 15, 2021
19bf3b5
Merge branch 'features/async' of github.com:cortexlabs/cortex into fe…
vishalbollu Mar 15, 2021
fb67bd4
Improve exception messages in async python code
Mar 15, 2021
765b8a2
Merge branch 'master' into features/async
RobertLucian Mar 15, 2021
5a470a4
Merge branch 'master' into features/async
vishalbollu Mar 15, 2021
fea298a
PR nits
vishalbollu Mar 15, 2021
365c358
Remove TODOs
vishalbollu Mar 15, 2021
3dd4f2f
Merge branch 'master' into features/async
vishalbollu Mar 15, 2021
744c357
Post multi instance type merge changes
vishalbollu Mar 15, 2021
42c636f
Post multi instance type merge #2
vishalbollu Mar 15, 2021
f72913d
Nits
vishalbollu Mar 15, 2021
784989b
Add cx logs support for async
vishalbollu Mar 16, 2021
2504478
Fix update message for async
vishalbollu Mar 16, 2021
21f9a01
Merge branch 'master' into features/async
vishalbollu Mar 16, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions async-gateway/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
Copyright 2021 Cortex Labs, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"encoding/json"
"fmt"
"net/http"

"github.com/gorilla/mux"
"go.uber.org/zap"
)

// Endpoint wraps an async-gateway Service with HTTP logic
type Endpoint struct {
service Service
logger *zap.Logger
}

// NewEndpoint creates and initializes a new Endpoint struct
func NewEndpoint(svc Service, logger *zap.Logger) *Endpoint {
return &Endpoint{
service: svc,
logger: logger,
}
}

// CreateWorkload is a handler for the async-gateway service workload creation route
func (e *Endpoint) CreateWorkload(w http.ResponseWriter, r *http.Request) {
requestID := r.Header.Get("x-request-id")
if requestID == "" {
respondPlainText(w, http.StatusBadRequest, "error: missing x-request-id key in request header")
return
}

contentType := r.Header.Get("Content-Type")
if contentType == "" {
respondPlainText(w, http.StatusBadRequest, "error: missing Content-Type key in request header")
return
}

body := r.Body
defer func() {
_ = r.Body.Close()
}()

log := e.logger.With(zap.String("id", requestID), zap.String("contentType", contentType))

id, err := e.service.CreateWorkload(requestID, body, contentType)
if err != nil {
log.Error("failed to create workload", zap.Error(err))
respondPlainText(w, http.StatusInternalServerError, fmt.Sprintf("error: %v", err))
return
}

if err = respondJSON(w, http.StatusOK, CreateWorkloadResponse{ID: id}); err != nil {
log.Error("failed to encode json response", zap.Error(err))
return
}
}

// GetWorkload is a handler for the async-gateway service workload retrieval route
func (e *Endpoint) GetWorkload(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
id, ok := vars["id"]
if !ok {
respondPlainText(w, http.StatusBadRequest, "error: missing request id in url path")
return
}

log := e.logger.With(zap.String("id", id))

res, err := e.service.GetWorkload(id)
if err != nil {
log.Error("failed to get workload", zap.Error(err))
respondPlainText(w, http.StatusInternalServerError, fmt.Sprintf("error: %v", err))
return
}

if err = respondJSON(w, http.StatusOK, res); err != nil {
log.Error("failed to encode json response", zap.Error(err))
return
}
}

func respondPlainText(w http.ResponseWriter, statusCode int, message string) {
w.WriteHeader(statusCode)
w.Header().Set("Content-Type", "text/plain")
_, _ = w.Write([]byte(message))
}

func respondJSON(w http.ResponseWriter, statusCode int, s interface{}) error {
w.WriteHeader(statusCode)
w.Header().Set("Content-Type", "application/json")
return json.NewEncoder(w).Encode(s)
}
9 changes: 9 additions & 0 deletions async-gateway/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module github.com/cortexlabs/async-gateway

go 1.15

require (
github.com/aws/aws-sdk-go v1.37.23
github.com/gorilla/mux v1.8.0
go.uber.org/zap v1.16.0
)
72 changes: 72 additions & 0 deletions async-gateway/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/aws/aws-sdk-go v1.37.23 h1:bO80NcSmRv52w+GFpBegoLdlP/Z0OwUqQ9bbeCLCy/0=
github.com/aws/aws-sdk-go v1.37.23/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM=
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
135 changes: 135 additions & 0 deletions async-gateway/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
Copyright 2021 Cortex Labs, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"flag"
"net/http"
"os"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/gorilla/mux"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

const (
_defaultPort = "8080"
)

func createLogger() (*zap.Logger, error) {
logLevelEnv := os.Getenv("CORTEX_LOG_LEVEL")
disableJSONLogging := os.Getenv("CORTEX_DISABLE_JSON_LOGGING")

var logLevelZap zapcore.Level
switch logLevelEnv {
case "DEBUG":
logLevelZap = zapcore.DebugLevel
case "WARNING":
logLevelZap = zapcore.WarnLevel
case "ERROR":
logLevelZap = zapcore.ErrorLevel
default:
logLevelZap = zapcore.InfoLevel
}

encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.MessageKey = "message"

encoding := "json"
if strings.ToLower(disableJSONLogging) == "true" {
encoding = "console"
}

return zap.Config{
Level: zap.NewAtomicLevelAt(logLevelZap),
Encoding: encoding,
EncoderConfig: encoderConfig,
OutputPaths: []string{"stdout"},
ErrorOutputPaths: []string{"stderr"},
}.Build()
}

// usage: ./gateway -bucket <bucket> -region <region> -port <port> -queue queue <apiName>
func main() {
log, err := createLogger()
if err != nil {
panic(err)
}
defer func() {
_ = log.Sync()
}()

var (
port = flag.String("port", _defaultPort, "port on which the gateway server runs on")
queueURL = flag.String("queue", "", "SQS queue URL")
region = flag.String("region", "", "AWS region")
bucket = flag.String("bucket", "", "AWS bucket")
clusterName = flag.String("cluster", "", "cluster name")
)
flag.Parse()

switch {
case *queueURL == "":
log.Fatal("missing required option: -queue")
case *region == "":
log.Fatal("missing required option: -region")
case *bucket == "":
log.Fatal("missing required option: -bucket")
case *clusterName == "":
log.Fatal("missing required option: -cluster")
}

apiName := flag.Arg(0)
if apiName == "" {
log.Fatal("apiName argument was not provided")
}

sess, err := session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Region: region,
},
SharedConfigState: session.SharedConfigEnable,
})
if err != nil {
log.Fatal("failed to create AWS session: %s", zap.Error(err))
}

s3Storage := NewS3(sess, *bucket)

sqsQueue := NewSQS(*queueURL, sess)

svc := NewService(*clusterName, apiName, sqsQueue, s3Storage, log)
ep := NewEndpoint(svc, log)

router := mux.NewRouter()
router.HandleFunc("/", ep.CreateWorkload).Methods("POST")
router.HandleFunc(
"/healthz",
func(w http.ResponseWriter, r *http.Request) {
respondPlainText(w, http.StatusOK, "ok")
},
)
router.HandleFunc("/{id}", ep.GetWorkload).Methods("GET")

log.Info("Running on port " + *port)
if err = http.ListenAndServe(":"+*port, router); err != nil {
log.Fatal("failed to start server", zap.Error(err))
}
}
51 changes: 51 additions & 0 deletions async-gateway/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright 2021 Cortex Labs, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
awssqs "github.com/aws/aws-sdk-go/service/sqs"
)

// Queue is an interface to abstract communication with event queues
type Queue interface {
SendMessage(message string, uniqueID string) error
}

type sqs struct {
queueURL string
client *awssqs.SQS
}

// NewSQS creates a new SQS client that satisfies the Queue interface
func NewSQS(queueURL string, sess *session.Session) Queue {
client := awssqs.New(sess)

return &sqs{queueURL: queueURL, client: client}
}

// SendMessage sends a string
func (q *sqs) SendMessage(message string, uniqueID string) error {
_, err := q.client.SendMessage(&awssqs.SendMessageInput{
MessageBody: aws.String(message),
MessageDeduplicationId: aws.String(uniqueID),
MessageGroupId: aws.String(uniqueID),
QueueUrl: aws.String(q.queueURL),
})
return err
}
Loading