Skip to content

Commit

Permalink
Added http sender and receiver for testing purpose
Browse files Browse the repository at this point in the history
  • Loading branch information
plaxomike committed May 25, 2021
1 parent 15498b7 commit 2767202
Show file tree
Hide file tree
Showing 6 changed files with 361 additions and 4 deletions.
12 changes: 8 additions & 4 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,18 @@ issues:
linters:
- gosec
- path: internal/pkg/aws/s3/client\.go
text": "type assertion on error"
text: "type assertion on error"
linters:
- errorlint
- path: internal/pkg/aws/s3/client_test\.go
text": "type assertion on error"
text: "type assertion on error"
linters:
- errorlint
- path: internal/pkg/aws/s3/error\.go
text": "type assertion on error"
text: "type assertion on error"
linters:
- errorlint
- errorlint
- path: pkg/plugins/http/receiver\.go
text: "SA1029"
linters:
- staticcheck
98 changes: 98 additions & 0 deletions pkg/plugins/http/receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package http

import (
"context"
"encoding/json"
"fmt"
"github.com/goccy/go-yaml"
"github.com/rs/zerolog"
"github.com/xmidt-org/ears/pkg/event"
pkgplugin "github.com/xmidt-org/ears/pkg/plugin"
"github.com/xmidt-org/ears/pkg/receiver"
"io/ioutil"
"net/http"
"os"
)

func NewReceiver(config interface{}) (receiver.Receiver, error) {
var cfg ReceiverConfig
var err error
switch c := config.(type) {
case string:
err = yaml.Unmarshal([]byte(c), &cfg)
case []byte:
err = yaml.Unmarshal(c, &cfg)
case ReceiverConfig:
cfg = c
case *ReceiverConfig:
cfg = *c
}
if err != nil {
return nil, &pkgplugin.InvalidConfigError{
Err: err,
}
}
err = cfg.Validate()
if err != nil {
return nil, err
}

logger := zerolog.New(os.Stdout).Level(zerolog.InfoLevel)
return &Receiver{
path: cfg.Path,
method: cfg.Method,
port: *cfg.Port,
logger: &logger,
}, nil
}

func (h *Receiver) GetTraceId(r *http.Request) string {
return r.Header.Get("traceId")
}

func (h *Receiver) Receive(next receiver.NextFn) error {

mux := http.NewServeMux()
h.srv = &http.Server{Addr: fmt.Sprintf(":%d", h.port), Handler: mux}

mux.HandleFunc(h.path, func(w http.ResponseWriter, r *http.Request) {
defer fmt.Fprintln(w, "good")

b, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
if err != nil {
h.logger.Error().Str("error", err.Error()).Msg("error reading body")
return
}

var body interface{}
err = json.Unmarshal(b, &body)
if err != nil {
h.logger.Error().Str("error", err.Error()).Msg("error unmarshalling body")
return
}
ctx := context.Background()
event, err := event.New(ctx, body)
if err != nil {
h.logger.Error().Str("error", err.Error()).Msg("error creating event")
}

traceId := h.GetTraceId(r)
if traceId != "" {
subCtx := context.WithValue(event.Context(), "traceId", traceId)
event.SetContext(subCtx)
}

next(event)
})

return h.srv.ListenAndServe()
}

func (h *Receiver) StopReceiving(ctx context.Context) error {
if h.srv != nil {
h.logger.Info().Msg("Shutting down HTTP receiver")
return h.srv.Shutdown(ctx)
}
return nil
}
52 changes: 52 additions & 0 deletions pkg/plugins/http/receiver_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package http

import (
"fmt"
"github.com/xeipuuv/gojsonschema"
)

// Validate returns an error upon validation failure
func (rc *ReceiverConfig) Validate() error {
schema := gojsonschema.NewStringLoader(receiverSchema)
doc := gojsonschema.NewGoLoader(*rc)
result, err := gojsonschema.Validate(schema, doc)
if err != nil {
return err
}
if !result.Valid() {
return fmt.Errorf(fmt.Sprintf("%+v", result.Errors()))
}
return nil
}

const receiverSchema = `
{
"$schema": "http://json-schema.org/draft-06/schema#",
"$ref": "#/definitions/ReceiverConfig",
"definitions": {
"ReceiverConfig": {
"type": "object",
"additionalProperties": false,
"properties": {
"path": {
"type": "string"
},
"method": {
"type": "string"
},
"port": {
"type": "integer",
"minimum": 1024,
"maximum": 65535
}
},
"required": [
"path",
"port",
"method"
],
"title": "ReceiverConfig"
}
}
}
`
91 changes: 91 additions & 0 deletions pkg/plugins/http/sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package http

import (
"bytes"
"context"
"encoding/json"
"github.com/goccy/go-yaml"
"github.com/xmidt-org/ears/pkg/event"
pkgplugin "github.com/xmidt-org/ears/pkg/plugin"
"github.com/xmidt-org/ears/pkg/sender"
"io"
"io/ioutil"
"net/http"
"time"
)

const DEFAULT_TIMEOUT = 10

func NewSender(config interface{}) (sender.Sender, error) {
var cfg SenderConfig
var err error
switch c := config.(type) {
case string:
err = yaml.Unmarshal([]byte(c), &cfg)
case []byte:
err = yaml.Unmarshal(c, &cfg)
case SenderConfig:
cfg = c
case *SenderConfig:
cfg = *c
}
if err != nil {
return nil, &pkgplugin.InvalidConfigError{
Err: err,
}
}
err = cfg.Validate()
if err != nil {
return nil, err
}
return &Sender{
client: &http.Client{
Timeout: DEFAULT_TIMEOUT * time.Second,
},
method: cfg.Method,
url: cfg.Url,
}, nil
}

func (s *Sender) SetTraceId(r *http.Request, traceId string) {
r.Header.Set("traceId", traceId)
}

func (s *Sender) Send(event event.Event) {
payload := event.Payload()
body, err := json.Marshal(payload)
if err != nil {
event.Nack(err)
return
}

req, err := http.NewRequest(s.method, s.url, bytes.NewReader(body))
if err != nil {
event.Nack(err)
}

ctx := event.Context()
traceId := ctx.Value("traceId")
if traceId != nil {
s.SetTraceId(req, traceId.(string))
}

resp, err := s.client.Do(req)
if err != nil {
event.Nack(err)
}

io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusBadRequest {
event.Nack(&BadHttpStatusError{resp.StatusCode})
}
}

func (s *Sender) Unwrap() sender.Sender {
return nil
}

func (s *Sender) StopSending(ctx context.Context) {
//nothing
}
46 changes: 46 additions & 0 deletions pkg/plugins/http/sender_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package http

import (
"fmt"
"github.com/xeipuuv/gojsonschema"
)

// Validate
func (sc *SenderConfig) Validate() error {
schema := gojsonschema.NewStringLoader(senderSchema)
doc := gojsonschema.NewGoLoader(*sc)
result, err := gojsonschema.Validate(schema, doc)
if err != nil {
return err
}
if !result.Valid() {
return fmt.Errorf(fmt.Sprintf("%+v", result.Errors()))
}
return nil
}

const senderSchema = `
{
"$schema": "http://json-schema.org/draft-06/schema#",
"$ref": "#/definitions/SenderConfig",
"definitions": {
"SenderConfig": {
"type": "object",
"additionalProperties": false,
"properties": {
"url": {
"type": "string"
},
"method": {
"type": "string"
}
},
"required": [
"url",
"method"
],
"title": "SenderConfig"
}
}
}
`
66 changes: 66 additions & 0 deletions pkg/plugins/http/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package http

import (
"github.com/rs/zerolog"
"github.com/xmidt-org/ears/pkg/errs"
pkgplugin "github.com/xmidt-org/ears/pkg/plugin"
"github.com/xmidt-org/ears/pkg/receiver"
"github.com/xmidt-org/ears/pkg/sender"
"net/http"
)

var _ sender.Sender = (*Sender)(nil)
var _ receiver.Receiver = (*Receiver)(nil)

var (
Name = "http"
Version = "v0.0.0"
CommitID = ""
)

func NewPlugin() (*pkgplugin.Plugin, error) {
return NewPluginVersion(Name, Version, CommitID)
}

func NewPluginVersion(name string, version string, commitID string) (*pkgplugin.Plugin, error) {
return pkgplugin.NewPlugin(
pkgplugin.WithName(name),
pkgplugin.WithVersion(version),
pkgplugin.WithCommitID(commitID),
pkgplugin.WithNewReceiver(NewReceiver),
pkgplugin.WithNewSender(NewSender),
)
}

type ReceiverConfig struct {
Path string `json:"path,omitempty"`
Port *int `json:"port,omitempty"`
Method string `json:"method:omitempty"`
}

type Receiver struct {
path string
method string
port int
logger *zerolog.Logger
srv *http.Server
}

type SenderConfig struct {
Url string `json:"url,omitempty"`
Method string `json:"method,omitempty"`
}

type Sender struct {
client *http.Client
method string
url string
}

type BadHttpStatusError struct {
statusCode int
}

func (e *BadHttpStatusError) Error() string {
return errs.String("BadHttpStatusError", map[string]interface{}{"statusCode": e.statusCode}, nil)
}

0 comments on commit 2767202

Please sign in to comment.