From 34aaad7800d2fba3341a143ca722457af57b385d Mon Sep 17 00:00:00 2001 From: Peter Grlica Date: Thu, 19 Aug 2021 15:27:30 +0200 Subject: [PATCH] Refactored pipelines, errors --- app/servers.go | 4 +- pkg/apigw/ctx/ctx.go | 19 ++ pkg/apigw/filter/filter.go | 26 +++ pkg/apigw/filter/postfilter.go | 96 +++------- pkg/apigw/filter/postfilter_test.go | 71 ++++--- pkg/apigw/filter/prefilter.go | 203 +++++++------------- pkg/apigw/filter/prefilter_test.go | 190 ++++++++---------- pkg/apigw/filter/processer.go | 220 +++++++++++---------- pkg/apigw/filter/processer_test.go | 26 ++- pkg/apigw/filter/proxy/proxy.go | 114 +++++------ pkg/apigw/filter/proxy/proxy_auth.go | 14 +- pkg/apigw/filter/proxy/proxy_auth_test.go | 24 ++- pkg/apigw/filter/proxy/proxy_test.go | 23 ++- pkg/apigw/pipeline/pipeline.go | 145 +++++++------- pkg/apigw/pipeline/pipeline_test.go | 117 +++++++++--- pkg/apigw/registry/registry.go | 3 +- pkg/apigw/route.go | 31 ++- pkg/apigw/route_test.go | 93 +++++---- pkg/apigw/service.go | 76 +++++--- pkg/apigw/service_test.go | 222 +++++++++++----------- pkg/apigw/types/error.go | 23 ++- pkg/apigw/types/handler.go | 32 ++-- pkg/apigw/types/test.go | 39 +--- pkg/expr/expr.go | 2 +- pkg/expr/expr_test.go | 2 + pkg/expr/expr_types_test.go | 8 +- pkg/options/apigw.gen.go | 2 + pkg/options/apigw.yaml | 6 + 28 files changed, 913 insertions(+), 918 deletions(-) create mode 100644 pkg/apigw/ctx/ctx.go create mode 100644 pkg/apigw/filter/filter.go diff --git a/app/servers.go b/app/servers.go index b0d186912c..6b251e5bf2 100644 --- a/app/servers.go +++ b/app/servers.go @@ -106,10 +106,10 @@ func (app *CortezaApp) mountHttpRoutes(r chi.Router) { r.Route("/federation", federationRest.MountRoutes) } - // temp api gateway support + // API Gateway { apigw.Setup(options.Apigw(), service.DefaultLogger, service.DefaultStore) - r.Route("/gateway", apigw.Service().Router) + r.Route("/", apigw.Service().Router) } var fullpathDocs = options.CleanBase(ho.BaseUrl, ho.ApiBaseUrl, "docs") diff --git a/pkg/apigw/ctx/ctx.go b/pkg/apigw/ctx/ctx.go new file mode 100644 index 0000000000..a2c226802b --- /dev/null +++ b/pkg/apigw/ctx/ctx.go @@ -0,0 +1,19 @@ +package ctx + +import ( + "context" + + "github.com/cortezaproject/corteza-server/pkg/apigw/types" +) + +type ContextKey string + +const ContextKeyScope ContextKey = "scope" + +func ScopeToContext(ctx context.Context, s *types.Scp) context.Context { + return context.WithValue(ctx, ContextKeyScope, s) +} + +func ScopeFromContext(ctx context.Context) *types.Scp { + return ctx.Value(ContextKeyScope).(*types.Scp) +} diff --git a/pkg/apigw/filter/filter.go b/pkg/apigw/filter/filter.go new file mode 100644 index 0000000000..03a529985a --- /dev/null +++ b/pkg/apigw/filter/filter.go @@ -0,0 +1,26 @@ +package filter + +import ( + "github.com/cortezaproject/corteza-server/pkg/apigw/types" +) + +const ( + PreFilterWeight = iota + ProcesserWeight + PostFilterWeight +) + +func FilterWeight(w int, t types.FilterKind) int { + mul := PreFilterWeight + + switch t { + case types.PreFilter: + mul = PreFilterWeight + case types.Processer: + mul = ProcesserWeight + case types.PostFilter: + mul = PostFilterWeight + } + + return mul*100 + w +} diff --git a/pkg/apigw/filter/postfilter.go b/pkg/apigw/filter/postfilter.go index 71c03d9747..0460eec718 100644 --- a/pkg/apigw/filter/postfilter.go +++ b/pkg/apigw/filter/postfilter.go @@ -2,18 +2,22 @@ package filter import ( "bytes" - "context" "encoding/json" "fmt" "net/http" "net/url" "github.com/cortezaproject/corteza-server/pkg/apigw/types" + pe "github.com/cortezaproject/corteza-server/pkg/errors" ) type ( redirection struct { types.FilterMeta + + location *url.URL + status int + params struct { HTTPStatus int `json:"status,string"` Location string `json:"location"` @@ -32,11 +36,6 @@ type ( defaultJsonResponse struct { types.FilterMeta } - - errorHandler struct { - name string - args []string - } ) func NewRedirection() (e *redirection) { @@ -63,11 +62,7 @@ func NewRedirection() (e *redirection) { } func (h redirection) String() string { - return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label) -} - -func (h redirection) Type() types.FilterKind { - return h.Kind + return fmt.Sprintf("apigw filter %s (%s)", h.Name, h.Label) } func (h redirection) Meta() types.FilterMeta { @@ -78,61 +73,30 @@ func (h redirection) Weight() int { return h.Wgt } -func (f *redirection) Merge(params []byte) (types.Handler, error) { - err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&f.params) - return f, err -} +func (h *redirection) Merge(params []byte) (types.Handler, error) { + err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&h.params) -func (h redirection) Exec(ctx context.Context, scope *types.Scp) error { loc, err := url.ParseRequestURI(h.params.Location) if err != nil { - return fmt.Errorf("could not redirect: %s", err) + return nil, fmt.Errorf("could not validate parameters, invalid URL: %s", err) } - status := h.params.HTTPStatus - - if !checkStatus("redirect", status) { - return fmt.Errorf("could not redirect: wrong status %d", status) + if !checkStatus("redirect", h.params.HTTPStatus) { + return nil, fmt.Errorf("could not validate parameters, wrong status %d", h.params.HTTPStatus) } - http.Redirect(scope.Writer(), scope.Request(), loc.String(), status) - - return nil -} + h.location = loc + h.status = h.params.HTTPStatus -func NewErrorHandler(name string, args []string) (e *errorHandler) { - e = &errorHandler{ - name: name, - args: args, - } - - return + return h, err } -func (pp errorHandler) Exec(ctx context.Context, scope *types.Scp, err error) { - type ( - responseHelper struct { - ErrResponse struct { - Msg string `json:"msg"` - } `json:"error"` - } - ) - - resp := responseHelper{ - ErrResponse: struct { - Msg string "json:\"msg\"" - }{ - Msg: err.Error(), - }, +func (h redirection) Handler() types.HandlerFunc { + return func(rw http.ResponseWriter, r *http.Request) error { + http.Redirect(rw, r, h.location.String(), h.status) + return nil } - - // set http status code - scope.Writer().WriteHeader(http.StatusInternalServerError) - - // set body - json.NewEncoder(scope.Writer()).Encode(resp) - } func NewDefaultJsonResponse() (e *defaultJsonResponse) { @@ -146,32 +110,28 @@ func NewDefaultJsonResponse() (e *defaultJsonResponse) { } func (h defaultJsonResponse) String() string { - return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label) -} - -func (h defaultJsonResponse) Type() types.FilterKind { - return h.Kind + return fmt.Sprintf("apigw filter %s (%s)", h.Name, h.Label) } func (h defaultJsonResponse) Meta() types.FilterMeta { return h.FilterMeta } -func (h defaultJsonResponse) Weight() int { - return h.Wgt -} - func (f *defaultJsonResponse) Merge(params []byte) (h types.Handler, err error) { return f, err } -func (h defaultJsonResponse) Exec(ctx context.Context, scope *types.Scp) (err error) { - scope.Writer().Header().Set("Content-Type", "application/json") - scope.Writer().WriteHeader(http.StatusAccepted) +func (h defaultJsonResponse) Handler() types.HandlerFunc { + return func(rw http.ResponseWriter, r *http.Request) error { + rw.Header().Set("Content-Type", "application/json") + rw.WriteHeader(http.StatusAccepted) - _, err = scope.Writer().Write([]byte(`{}`)) + if _, err := rw.Write([]byte(`{}`)); err != nil { + return pe.Internal("could not write to body: (%v)", err) + } - return + return nil + } } func checkStatus(typ string, status int) bool { diff --git a/pkg/apigw/filter/postfilter_test.go b/pkg/apigw/filter/postfilter_test.go index 51e08747b3..50798fd1cc 100644 --- a/pkg/apigw/filter/postfilter_test.go +++ b/pkg/apigw/filter/postfilter_test.go @@ -1,7 +1,6 @@ package filter import ( - "context" "net/http" "net/http/httptest" "testing" @@ -10,12 +9,35 @@ import ( "github.com/stretchr/testify/require" ) -func Tesst_redirection(t *testing.T) { +func Test_redirectionMerge(t *testing.T) { + var ( + tcc = []tf{ + { + name: "url validation", + expr: `{"status":"301", "location": "invalid url"}`, + err: `could not validate parameters, invalid URL: parse "invalid url": invalid URI for request`, + }, + { + name: "invalid redirection status", + expr: `{"status":"400", "location": "http://redire.ct/to"}`, + err: "could not validate parameters, wrong status 400", + }, + } + ) + + for _, tc := range tcc { + t.Run(tc.name, testMerge(NewRedirection(), tc)) + } +} + +func Test_redirection(t *testing.T) { type ( tf struct { name string expr string err string + loc string + code int } ) @@ -24,43 +46,33 @@ func Tesst_redirection(t *testing.T) { { name: "simple redirection", expr: `{"status":"302", "location": "http://redire.ct/to"}`, + loc: "http://redire.ct/to", + code: 302, }, { name: "permanent redirection", expr: `{"status":"301", "location": "http://redire.ct/to"}`, - }, - { - name: "url validation", - expr: `{"status":"301", "location": "invalid url"}`, - err: `could not redirect: parse "invalid url": invalid URI for request`, - }, - { - name: "invalid redirection status", - expr: `{"status":"400", "location": "http://redire.ct/to"}`, - err: "could not redirect: wrong status 400", + loc: "http://redire.ct/to", + code: 301, }, } ) for _, tc := range tcc { - var ( - ctx = context.Background() - ) - t.Run(tc.name, func(t *testing.T) { - req := require.New(t) + var ( + req = require.New(t) + r = httptest.NewRequest(http.MethodGet, "/foo", http.NoBody) + rc = httptest.NewRecorder() + ) - r, err := http.NewRequest(http.MethodGet, "/foo", http.NoBody) + h := getHandler(NewRedirection()) + h, err := h.Merge([]byte(tc.expr)) req.NoError(err) - rc := httptest.NewRecorder() - scope := &types.Scp{"request": r, "writer": rc} - - h := NewRedirection() - h.Merge([]byte(tc.expr)) - - err = h.Exec(ctx, scope) + hn := h.Handler() + err = hn(rc, r) if tc.err != "" { req.EqualError(err, tc.err) @@ -68,8 +80,13 @@ func Tesst_redirection(t *testing.T) { } req.NoError(err) - req.Equal(h.params.Location, rc.Header().Get("Location")) - req.Equal(h.params.HTTPStatus, rc.Code) + req.Equal(tc.loc, rc.Header().Get("Location")) + req.Equal(tc.code, rc.Code) }) } } + +// hackity hack +func getHandler(h types.Handler) types.Handler { + return h +} diff --git a/pkg/apigw/filter/prefilter.go b/pkg/apigw/filter/prefilter.go index cbf7165a19..5f81a39846 100644 --- a/pkg/apigw/filter/prefilter.go +++ b/pkg/apigw/filter/prefilter.go @@ -2,17 +2,20 @@ package filter import ( "bytes" - "context" "encoding/json" + "errors" "fmt" + "net/http" "github.com/cortezaproject/corteza-server/pkg/apigw/types" + pe "github.com/cortezaproject/corteza-server/pkg/errors" "github.com/cortezaproject/corteza-server/pkg/expr" ) type ( header struct { types.FilterMeta + eval expr.Evaluable params struct { Expr string `json:"expr"` } @@ -20,6 +23,7 @@ type ( queryParam struct { types.FilterMeta + eval expr.Evaluable params struct { Expr string `json:"expr"` } @@ -27,6 +31,7 @@ type ( origin struct { types.FilterMeta + eval expr.Evaluable params struct { Expr string `json:"expr"` } @@ -52,131 +57,62 @@ func NewHeader() (v *header) { } func (h header) String() string { - return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label) -} - -func (h header) Type() types.FilterKind { - return h.Kind + return fmt.Sprintf("apigw filter %s (%s)", h.Name, h.Label) } func (h header) Meta() types.FilterMeta { return h.FilterMeta } -func (h header) Weight() int { - return h.Wgt -} - func (v *header) Merge(params []byte) (types.Handler, error) { err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&v.params) - return v, err -} - -func (h header) Exec(ctx context.Context, scope *types.Scp) error { - vv := map[string]interface{}{} - headers := scope.Request().Header - - for k, v := range headers { - // sanitize header keys? - vv[k] = v[0] - } - - // get the request data and put it into vars - out, err := expr.NewVars(vv) if err != nil { - return err + return nil, err } - pp := expr.NewParser() - tt, err := pp.Parse(h.params.Expr) + parser := expr.NewParser() + v.eval, err = parser.Parse(v.params.Expr) if err != nil { - return fmt.Errorf("could not parse matching expression: %s", err) + return nil, fmt.Errorf("could not validate origin parameters: %s", err) } - b, err := tt.Test(ctx, out) - - if err != nil { - return fmt.Errorf("could not validate headers: %s", err) - } - - if !b { - return fmt.Errorf("could not validate headers") - } - - return nil -} - -func NewOrigin() (v *origin) { - v = &origin{} - - v.Name = "origin" - v.Label = "Origin" - v.Kind = types.PreFilter - - v.Args = []*types.FilterMetaArg{ - { - Type: "expr", - Label: "expr", - Options: map[string]interface{}{}, - }, - } - - return -} - -func (h origin) String() string { - return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label) -} - -func (h origin) Type() types.FilterKind { - return h.Kind -} - -func (h origin) Meta() types.FilterMeta { - return h.FilterMeta + return v, err } -func (h origin) Weight() int { - return h.Wgt -} +func (h header) Handler() types.HandlerFunc { + return func(rw http.ResponseWriter, r *http.Request) error { + var ( + ctx = r.Context() + ) -func (v *origin) Merge(params []byte) (types.Handler, error) { - err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&v.params) - return v, err -} + vv := map[string]interface{}{} + headers := r.Header -func (h origin) Exec(ctx context.Context, scope *types.Scp) error { - vv := map[string]interface{}{ - "origin": scope.Request().Header.Get("Origin"), - } + for k, v := range headers { + vv[k] = v[0] + } - // get the request data and put it into vars - out, err := expr.NewVars(vv) + // get the request data and put it into vars + out, err := expr.NewVars(vv) - if err != nil { - return err - } - // spew.Dump("OUT", out) - pp := expr.NewParser() - tt, err := pp.Parse(h.params.Expr) + if err != nil { + return pe.Internal("could not validate headers: (%v) (%s)", err, h.params.Expr) + } - if err != nil { - return fmt.Errorf("could not parse matching expression: %s", err) - } + b, err := h.eval.Test(ctx, out) - b, err := tt.Test(ctx, out) + if err != nil { + return pe.InvalidData("could not validate headers: (%v) (%s)", err, h.params.Expr) + } - if err != nil { - return fmt.Errorf("could not validate origin: %s", err) - } + if !b { + return pe.InvalidData("could not validate headers: (%v) (%s)", errors.New("validation failed"), h.params.Expr) + } - if !b { - return fmt.Errorf("could not validate origin") + return nil } - - return nil } func NewQueryParam() (v *queryParam) { @@ -198,57 +134,60 @@ func NewQueryParam() (v *queryParam) { } func (h queryParam) String() string { - return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label) -} - -func (h queryParam) Type() types.FilterKind { - return h.Kind + return fmt.Sprintf("apigw filter %s (%s)", h.Name, h.Label) } func (h queryParam) Meta() types.FilterMeta { return h.FilterMeta } -func (h queryParam) Weight() int { - return h.Wgt -} - func (v *queryParam) Merge(params []byte) (types.Handler, error) { err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&v.params) - return v, err -} - -func (h queryParam) Exec(ctx context.Context, scope *types.Scp) error { - vv := map[string]interface{}{} - vals := scope.Request().URL.Query() - for k, v := range vals { - vv[k] = v[0] + if err != nil { + return nil, err } - // get the request data and put it into vars - out, err := expr.NewVars(vv) + parser := expr.NewParser() + v.eval, err = parser.Parse(v.params.Expr) if err != nil { - return err + return nil, fmt.Errorf("could not validate query parameters: %s", err) } - pp := expr.NewParser() - tt, err := pp.Parse(h.params.Expr) + return v, err +} - if err != nil { - return fmt.Errorf("could not parse matching expression: %s", err) - } +func (h *queryParam) Handler() types.HandlerFunc { + return func(rw http.ResponseWriter, r *http.Request) error { + var ( + ctx = r.Context() + ) - b, err := tt.Test(ctx, out) + vv := map[string]interface{}{} + vals := r.URL.Query() - if err != nil { - return fmt.Errorf("could not validate query params: %s", err) - } + for k, v := range vals { + vv[k] = v[0] + } - if !b { - return fmt.Errorf("could not validate query params") - } + // get the request data and put it into vars + out, err := expr.NewVars(vv) + + if err != nil { + return pe.Internal("could not validate query parameters: (%v) (%s)", err, h.params.Expr) + } - return nil + b, err := h.eval.Test(ctx, out) + + if err != nil { + return pe.InvalidData("could not validate query parameters: (%v) (%s)", err, h.params.Expr) + } + + if !b { + return pe.InvalidData("could not validate query parameters: (%v) (%s)", errors.New("validation failed"), h.params.Expr) + } + + return nil + } } diff --git a/pkg/apigw/filter/prefilter_test.go b/pkg/apigw/filter/prefilter_test.go index 24eba8c717..88b51f270b 100644 --- a/pkg/apigw/filter/prefilter_test.go +++ b/pkg/apigw/filter/prefilter_test.go @@ -1,24 +1,43 @@ package filter import ( - "context" "net/http" + "net/http/httptest" "testing" "github.com/cortezaproject/corteza-server/pkg/apigw/types" "github.com/stretchr/testify/require" ) -func Test_header(t *testing.T) { - type ( - tf struct { - name string - expr string - err string - headers http.Header +type ( + tf struct { + name string + expr string + err string + url string + o string + headers http.Header + } +) + +func Test_headerMerge(t *testing.T) { + var ( + tcc = []tf{ + { + name: "non matching key", + expr: `{"expr":"Foo1 == bar\""}`, + headers: map[string][]string{"Foo": {"bar"}}, + err: "could not validate origin parameters: parsing error: Foo1 == bar\"\t:1:12 - 1:13 unexpected String while scanning operator", + }, } ) + for _, tc := range tcc { + t.Run(tc.name, testMerge(NewHeader(), tc)) + } +} + +func Test_headerHandle(t *testing.T) { var ( tcc = []tf{ { @@ -35,13 +54,13 @@ func Test_header(t *testing.T) { name: "non matching value", expr: `{"expr":"Foo == \"bar1\""}`, headers: map[string][]string{"Foo": {"bar"}}, - err: "could not validate headers", + err: `could not validate headers: (validation failed) (Foo == "bar1")`, }, { name: "non matching key", expr: `{"expr":"Foo1 == \"bar\""}`, headers: map[string][]string{"Foo": {"bar"}}, - err: "could not validate headers: failed to select 'Foo1' on *expr.Vars: no such key 'Foo1'", + err: `could not validate headers: (failed to select 'Foo1' on *expr.Vars: no such key 'Foo1') (Foo1 == "bar")`, }, { name: "regex matching key", @@ -57,44 +76,14 @@ func Test_header(t *testing.T) { ) for _, tc := range tcc { - var ( - ctx = context.Background() - ) - - t.Run(tc.name, func(t *testing.T) { - req := require.New(t) + r := httptest.NewRequest(http.MethodGet, "/foo", http.NoBody) + r.Header = tc.headers - r, err := http.NewRequest(http.MethodGet, "/foo", http.NoBody) - r.Header = tc.headers - - req.NoError(err) - - scope := &types.Scp{"request": r} - - h := NewHeader() - h.Merge([]byte(tc.expr)) - - err = h.Exec(ctx, scope) - - if tc.err != "" { - req.EqualError(err, tc.err) - } else { - req.NoError(err) - } - }) + t.Run(tc.name, testHandle(NewHeader(), r, tc)) } } -func Test_queryParam(t *testing.T) { - type ( - tf struct { - name string - expr string - err string - url string - } - ) - +func Test_queryParamMerge(t *testing.T) { var ( tcc = []tf{ { @@ -106,13 +95,12 @@ func Test_queryParam(t *testing.T) { name: "matching simple query parameter - invalid expression key", expr: `{"expr1":"foo == \"bar\""}`, url: "https://examp.le?foo=bar", - err: "could not parse matching expression: parsing error: - 1:1 unexpected EOF while scanning extensions", + err: "could not validate query parameters: parsing error: - 1:1 unexpected EOF while scanning extensions", }, { name: "matching simple query parameter - missing value", expr: `{"expr":"foo == \"bar\""}`, url: "https://examp.le?foo=bar1", - err: "could not validate query params", }, { name: "matching simple query parameter - missing value", @@ -123,96 +111,72 @@ func Test_queryParam(t *testing.T) { ) for _, tc := range tcc { - var ( - ctx = context.Background() - ) - - t.Run(tc.name, func(t *testing.T) { - req := require.New(t) - - r, err := http.NewRequest(http.MethodGet, tc.url, http.NoBody) - - req.NoError(err) - - scope := &types.Scp{"request": r} - - h := NewQueryParam() - h.Merge([]byte(tc.expr)) - - err = h.Exec(ctx, scope) - - if tc.err != "" { - req.EqualError(err, tc.err) - } else { - req.NoError(err) - } - }) + t.Run(tc.name, testMerge(NewQueryParam(), tc)) } } -func Test_origin(t *testing.T) { - type ( - tf struct { - name string - expr string - err string - o string - } - ) - +func Test_queryParamHandle(t *testing.T) { var ( tcc = []tf{ { - name: "matching simple origin value", - expr: `{"expr":"origin == \"https://www.google.com\""}`, - o: "https://www.google.com", - }, - { - name: "matching simple nonexistent origin value", - expr: `{"expr":"origin == \"https://www.google.com\""}`, - o: "", - err: "could not validate origin", + name: "matching simple query parameter", + expr: `{"expr":"foo == \"bar\""}`, + url: "https://examp.le?foo=bar", }, { - name: "matching simple origin value - invalid expression key", - expr: `{"expr1":"origin == \"https://www.google.com\""}`, - o: "", - err: "could not parse matching expression: parsing error: \t - 1:1 unexpected EOF while scanning extensions", + name: "matching simple query parameter - missing value", + expr: `{"expr":"foo == \"bar\""}`, + url: "https://examp.le?foo=bar1", + err: `could not validate query parameters: (validation failed) (foo == "bar")`, }, { - name: "matching simple origin value - invalid expression key", - expr: `{"expr1":"origin == \"https"}`, - o: "", - err: "could not parse matching expression: parsing error: \t - 1:1 unexpected EOF while scanning extensions", + name: "matching query parameter", + expr: `{"expr":"foo == \"bar-baz\""}`, + url: "https://examp.le?foo=bar-baz", }, } ) for _, tc := range tcc { + r := httptest.NewRequest(http.MethodGet, tc.url, http.NoBody) + t.Run(tc.name, testHandle(NewQueryParam(), r, tc)) + } +} + +func testMerge(h types.Handler, tc tf) func(t *testing.T) { + return func(t *testing.T) { var ( - ctx = context.Background() + req = require.New(t) ) - t.Run(tc.name, func(t *testing.T) { - req := require.New(t) - - r, err := http.NewRequest(http.MethodGet, "/foo", http.NoBody) - r.Header.Set("Origin", tc.o) + _, err := h.Merge([]byte(tc.expr)) + if tc.err != "" { + req.EqualError(err, tc.err) + } else { req.NoError(err) + } + } +} + +func testHandle(h types.Handler, r *http.Request, tc tf) func(t *testing.T) { + return func(t *testing.T) { + var ( + req = require.New(t) + ) - scope := &types.Scp{"request": r} + h, err := h.Merge([]byte(tc.expr)) - h := NewOrigin() - h.Merge([]byte(tc.expr)) + req.NoError(err) - err = h.Exec(ctx, scope) + hfn := h.Handler() - if tc.err != "" { - req.EqualError(err, tc.err) - } else { - req.NoError(err) - } - }) + err = hfn(httptest.NewRecorder(), r) + + if tc.err != "" { + req.EqualError(err, tc.err) + } else { + req.NoError(err) + } } } diff --git a/pkg/apigw/filter/processer.go b/pkg/apigw/filter/processer.go index af2bce6536..6638deaf74 100644 --- a/pkg/apigw/filter/processer.go +++ b/pkg/apigw/filter/processer.go @@ -8,9 +8,12 @@ import ( "errors" "fmt" "io" + "net/http" atypes "github.com/cortezaproject/corteza-server/automation/types" + agctx "github.com/cortezaproject/corteza-server/pkg/apigw/ctx" "github.com/cortezaproject/corteza-server/pkg/apigw/types" + pe "github.com/cortezaproject/corteza-server/pkg/errors" "github.com/cortezaproject/corteza-server/pkg/expr" "github.com/cortezaproject/corteza-server/pkg/jsenv" "go.uber.org/zap" @@ -19,15 +22,20 @@ import ( type ( workflow struct { types.FilterMeta - d types.WfExecer + d WfExecer params struct { Workflow uint64 `json:"workflow,string"` } } + WfExecer interface { + Exec(ctx context.Context, workflowID uint64, p atypes.WorkflowExecParams) (*expr.Vars, atypes.Stacktrace, error) + } + processerPayload struct { types.FilterMeta + vm jsenv.Vm log *zap.Logger @@ -38,7 +46,7 @@ type ( } ) -func NewWorkflow(wf types.WfExecer) (p *workflow) { +func NewWorkflow(wf WfExecer) (p *workflow) { p = &workflow{} p.d = wf @@ -59,99 +67,93 @@ func NewWorkflow(wf types.WfExecer) (p *workflow) { } func (h workflow) String() string { - return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label) -} - -func (h workflow) Type() types.FilterKind { - return h.Kind + return fmt.Sprintf("apigw filter %s (%s)", h.Name, h.Label) } func (h workflow) Meta() types.FilterMeta { return h.FilterMeta } -func (h workflow) Weight() int { - return h.Wgt -} - func (f *workflow) Merge(params []byte) (types.Handler, error) { err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&f.params) return f, err } -func (h workflow) Exec(ctx context.Context, scope *types.Scp) error { - var ( - err error - ) +func (h workflow) Handler() types.HandlerFunc { + return func(rw http.ResponseWriter, r *http.Request) error { + var ( + err error + ctx = r.Context() + scope = agctx.ScopeFromContext(ctx) + ) - payload, err := scope.Get("payload") + payload, err := scope.Get("payload") - if err != nil { - return err - } - - rr, err := scope.Get("request") - - if err != nil { - return err - } + if err != nil { + return pe.Internal("could not get payload: (%v)", err) + } - // setup scope for workflow - vv := map[string]interface{}{ - "payload": payload, - "request": rr, - } + // setup scope for workflow + vv := map[string]interface{}{ + "payload": payload, + "request": r, + } - // get the request data and put it into vars - in, err := expr.NewVars(vv) + // get the request data and put it into vars + in, err := expr.NewVars(vv) - if err != nil { - return err - } + if err != nil { + return pe.Internal("could not validate request data: (%v)", err) + } - wp := atypes.WorkflowExecParams{ - Trace: true, - // todo depending on settings per-route - Async: false, - // todo depending on settings per-route - Wait: true, - Input: in, - } + wp := atypes.WorkflowExecParams{ + Trace: true, + // todo depending on settings per-route + Async: false, + // todo depending on settings per-route + Wait: true, + Input: in, + } - out, _, err := h.d.Exec(ctx, h.params.Workflow, wp) + out, _, err := h.d.Exec(ctx, h.params.Workflow, wp) - if err != nil { - return err - } + if err != nil { + return pe.Internal("could not exec workflow: (%v)", err) + } - // merge out with scope - merged, err := in.Merge(out) + // merge out with scope + merged, err := in.Merge(out) - if err != nil { - return err - } + if err != nil { + return pe.Internal("could not receive workflow results: (%v)", err) + } - mm, err := expr.CastToVars(merged) + mm, err := expr.CastToVars(merged) - for k, v := range mm { - scope.Set(k, v) - } + if err != nil { + return pe.Internal("could not receive workflow results: (%v)", err) + } - ss := scope.Filter(func(k string, v interface{}) bool { - if k == "eventType" || k == "resourceType" { - return false + for k, v := range mm { + scope.Set(k, v) } - return true - }) + ss := scope.Filter(func(k string, v interface{}) bool { + if k == "eventType" || k == "resourceType" { + return false + } - scope = ss + return true + }) - scope.Set("request", rr) - scope.Set("payload", payload) + scope = ss - return err + scope.Set("request", r) + scope.Set("payload", payload) + + return nil + } } func NewPayload(l *zap.Logger) (p *processerPayload) { @@ -184,81 +186,75 @@ func NewPayload(l *zap.Logger) (p *processerPayload) { } func (h processerPayload) String() string { - return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label) -} - -func (h processerPayload) Type() types.FilterKind { - return h.Kind + return fmt.Sprintf("apigw filter %s (%s)", h.Name, h.Label) } func (h processerPayload) Meta() types.FilterMeta { return h.FilterMeta } -func (h processerPayload) Weight() int { - return h.Wgt -} - -func (f *processerPayload) Merge(params []byte) (types.Handler, error) { - err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&f.params) +func (h *processerPayload) Merge(params []byte) (types.Handler, error) { + err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&h.params) if err != nil { return nil, err } - fn, err := base64.StdEncoding.DecodeString(f.params.Func) + fn, err := base64.StdEncoding.DecodeString(h.params.Func) if err != nil { return nil, fmt.Errorf("could not decode js func: %s", err) } - f.params.Func = string(fn) + h.params.Func = string(fn) - return f, err + if h.params.Func == "" { + return nil, errors.New("could not register function, body empty") + } + + return h, err } -func (h processerPayload) Exec(ctx context.Context, scope *types.Scp) (err error) { - log := h.log.With(zap.String("function", h.String())) +func (h processerPayload) Handler() types.HandlerFunc { + return func(rw http.ResponseWriter, r *http.Request) (err error) { + var ( + ctx = r.Context() + scope = agctx.ScopeFromContext(ctx) + ) - if h.params.Func == "" { - err = errors.New("function body empty") - log.Debug("could not register function", zap.Error(err)) - return - } + scope.Set("request", r) - fn, err := h.vm.RegisterFunction(h.params.Func) + fn, err := h.vm.RegisterFunction(h.params.Func) - if err != nil { - log.Debug("could not register function", zap.Error(err)) - return - } + if err != nil { + return pe.InvalidData("could not register function: (%v)", err) + } - out, err := fn.Exec(h.vm.New(scope)) + out, err := fn.Exec(h.vm.New(scope)) - if err != nil { - log.Debug("could not exec payload function", zap.Error(err)) - return - } + if err != nil { + return pe.Internal("could not exec payload function: (%v)", err) + } - // add to scope, so next steps can get the structure - scope.Set("payload", out) - - // check if string - switch out.(type) { - case string: - // handling the newline, to keep the consistency with the json encoder - // which automatically appends the newline - _, err = scope.Writer().Write([]byte(fmt.Sprintf("%s\n", out))) - default: - err = json.NewEncoder(scope.Writer()).Encode(out) - } + // add to scope, so next steps can get the structure + scope.Set("payload", out) + + // check if string + switch out.(type) { + case string: + // handling the newline, to keep the consistency with the json encoder + // which automatically appends the newline + _, err = rw.Write([]byte(fmt.Sprintf("%s\n", out))) + default: + err = json.NewEncoder(rw).Encode(out) + } + + if err != nil { + return pe.Internal("could not write to response body: (%v)", err) + } - if err != nil { - log.Debug("could not write to body", zap.Error(err)) return } - - return } func (h processerPayload) VM() jsenv.Vm { diff --git a/pkg/apigw/filter/processer_test.go b/pkg/apigw/filter/processer_test.go index 7bdf3fb40c..b093da4388 100644 --- a/pkg/apigw/filter/processer_test.go +++ b/pkg/apigw/filter/processer_test.go @@ -10,6 +10,7 @@ import ( "strings" "testing" + agctx "github.com/cortezaproject/corteza-server/pkg/apigw/ctx" "github.com/cortezaproject/corteza-server/pkg/apigw/types" "github.com/cortezaproject/corteza-server/pkg/options" "github.com/stretchr/testify/require" @@ -21,6 +22,7 @@ func Test_processerPayload(t *testing.T) { tf struct { name string err string + errv string params string exp string rq *http.Request @@ -69,7 +71,7 @@ func Test_processerPayload(t *testing.T) { Body: ioutil.NopCloser(strings.NewReader(`[{"name":"johnny", "surname":"mnemonic"},{"name":"johnny", "surname":"knoxville"}]`)), }, params: prepareFuncPayload(``), - err: `function body empty`, + errv: `could not register function, body empty`, }, } ) @@ -77,26 +79,34 @@ func Test_processerPayload(t *testing.T) { for _, tc := range tcc { t.Run(tc.name, func(t *testing.T) { var ( - ctx = context.Background() req = require.New(t) + rc = httptest.NewRecorder() ) pp := NewPayload(zap.NewNop()) - pp.Merge([]byte(tc.params)) + _, err := pp.Merge([]byte(tc.params)) + + if tc.errv != "" { + req.EqualError(err, tc.errv) + return + } else { + req.NoError(err) + } scope := &types.Scp{ - "request": tc.rq, - "writer": httptest.NewRecorder(), - "opts": options.Apigw(), + "opts": options.Apigw(), } - err := pp.Exec(ctx, scope) + tc.rq = tc.rq.WithContext(agctx.ScopeToContext(context.Background(), scope)) + + hn := pp.Handler() + err = hn(rc, tc.rq) if tc.err != "" { req.EqualError(err, tc.err) } else { req.NoError(err) - req.Equal(tc.exp, scope.Writer().(*httptest.ResponseRecorder).Body.String()) + req.Equal(tc.exp, rc.Body.String()) } }) } diff --git a/pkg/apigw/filter/proxy/proxy.go b/pkg/apigw/filter/proxy/proxy.go index 30a82c7b3b..6995b5dd31 100644 --- a/pkg/apigw/filter/proxy/proxy.go +++ b/pkg/apigw/filter/proxy/proxy.go @@ -11,7 +11,9 @@ import ( "net/url" "time" + actx "github.com/cortezaproject/corteza-server/pkg/apigw/ctx" "github.com/cortezaproject/corteza-server/pkg/apigw/types" + pe "github.com/cortezaproject/corteza-server/pkg/errors" "go.uber.org/zap" ) @@ -38,7 +40,7 @@ type ( params struct { Location string `json:"location"` - Auth proxyAuthParams `json:"auth"` + Auth ProxyAuthParams `json:"auth"` } } ) @@ -66,21 +68,13 @@ func New(l *zap.Logger, c *http.Client, s types.SecureStorager) (p *proxy) { } func (h proxy) String() string { - return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label) -} - -func (h proxy) Type() types.FilterKind { - return h.Kind + return fmt.Sprintf("apigw filter %s (%s)", h.Name, h.Label) } func (h proxy) Meta() types.FilterMeta { return h.FilterMeta } -func (h proxy) Weight() int { - return h.Wgt -} - func (f *proxy) Merge(params []byte) (types.Handler, error) { err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&f.params) @@ -98,71 +92,77 @@ func (f *proxy) Merge(params []byte) (types.Handler, error) { return f, err } -func (h proxy) Exec(ctx context.Context, scope *types.Scp) (err error) { - ctx, cancel := context.WithTimeout(ctx, scope.Opts().ProxyOutboundTimeout) - defer cancel() +func (h proxy) Handler() types.HandlerFunc { + return func(rw http.ResponseWriter, r *http.Request) (err error) { + var ( + ctx = r.Context() + scope = actx.ScopeFromContext(ctx) + ) - req := scope.Request() - log := h.log.With(zap.String("ref", h.Name)) + ctx, cancel := context.WithTimeout(ctx, scope.Opts().ProxyOutboundTimeout) + defer cancel() - outreq := req.Clone(ctx) + log := h.log.With(zap.String("ref", h.Name)) - l, err := url.ParseRequestURI(h.params.Location) + outreq := r.Clone(ctx) - if err != nil { - return fmt.Errorf("could not parse destination location for proxying: %s", err) - } + l, err := url.ParseRequestURI(h.params.Location) - outreq.URL = l - outreq.RequestURI = "" - outreq.Method = req.Method - outreq.Host = l.Hostname() + if err != nil { + return pe.InvalidData("could not parse destination location for proxying: (%v)", err) + } - // use authservicer, set any additional headers - err = h.a.Do(outreq) + outreq.URL = l + outreq.RequestURI = "" + outreq.Method = r.Method + outreq.Host = l.Hostname() - if err != nil { - return fmt.Errorf("errors setting auth for proxying: %s", err) - } + // use authservicer, set any additional headers + err = h.a.Do(outreq) - // merge the old query params to the new request - // do not overwrite old ones - // do it after the authServicer, since we also may add them there - mergeQueryParams(req, outreq) + if err != nil { + return pe.External("could not authenticate to external auth: (%v)", err) + } - if scope.Opts().ProxyEnableDebugLog { - o, _ := httputil.DumpRequestOut(outreq, false) - log.Debug("proxy outbound request", zap.Any("request", string(o))) - } + // merge the old query params to the new request + // do not overwrite old ones + // do it after the authServicer, since we also may add them there + mergeQueryParams(r, outreq) - // temporary metrics before the proper functionality - startTime := time.Now() + if scope.Opts().ProxyEnableDebugLog { + o, _ := httputil.DumpRequestOut(outreq, false) + log.Debug("proxy outbound request", zap.Any("request", string(o))) + } - // todo - disable / enable follow redirects, already - // added to options - resp, err := h.c.Do(outreq) + // temporary metrics before the proper functionality + startTime := time.Now() - if err != nil { - return fmt.Errorf("could not proxy request: %s", err) - } + // todo - disable / enable follow redirects, already + // added to options + resp, err := h.c.Do(outreq) - if scope.Opts().ProxyEnableDebugLog { - o, _ := httputil.DumpResponse(resp, false) - log.Debug("proxy outbound response", zap.Any("request", string(o)), zap.Duration("duration", time.Since(startTime))) - } + if err != nil { + return pe.Internal("could not proxy request: (%v)", err) + } - b, err := io.ReadAll(resp.Body) + if scope.Opts().ProxyEnableDebugLog { + o, _ := httputil.DumpResponse(resp, false) + log.Debug("proxy outbound response", zap.Any("request", string(o)), zap.Duration("duration", time.Since(startTime))) + } - if err != nil { - return fmt.Errorf("could not read get body on proxy request: %s", err) - } + b, err := io.ReadAll(resp.Body) - mergeHeaders(resp.Header, scope.Writer().Header()) + if err != nil { + return pe.Internal("could not read body on proxy request: (%v)", err) + } - // add to writer - scope.Writer().Write(b) + mergeHeaders(resp.Header, rw.Header()) - return nil + // add to writer + rw.Write(b) + + return nil + } } func mergeHeaders(orig, dest http.Header) { diff --git a/pkg/apigw/filter/proxy/proxy_auth.go b/pkg/apigw/filter/proxy/proxy_auth.go index f768e8b82d..0f06a95458 100644 --- a/pkg/apigw/filter/proxy/proxy_auth.go +++ b/pkg/apigw/filter/proxy/proxy_auth.go @@ -57,7 +57,7 @@ type ( proxyAuthType string - proxyAuthParams struct { + ProxyAuthParams struct { Type proxyAuthType `json:"type"` Params map[string]interface{} `json:"params"` } @@ -75,7 +75,7 @@ type ( } ) -func NewProxyAuthHeader(p proxyAuthParams) (s proxyAuthServicerHeader, err error) { +func NewProxyAuthHeader(p ProxyAuthParams) (s proxyAuthServicerHeader, err error) { s = proxyAuthServicerHeader{ params: p.Params, } @@ -83,7 +83,7 @@ func NewProxyAuthHeader(p proxyAuthParams) (s proxyAuthServicerHeader, err error return } -func NewProxyAuthQuery(p proxyAuthParams) (s proxyAuthServicerQuery, err error) { +func NewProxyAuthQuery(p ProxyAuthParams) (s proxyAuthServicerQuery, err error) { s = proxyAuthServicerQuery{ params: p.Params, } @@ -91,7 +91,7 @@ func NewProxyAuthQuery(p proxyAuthParams) (s proxyAuthServicerQuery, err error) return } -func NewProxyAuthBasic(p proxyAuthParams) (s proxyAuthServicerBasic, err error) { +func NewProxyAuthBasic(p ProxyAuthParams) (s proxyAuthServicerBasic, err error) { var ( ok bool user, pass string @@ -112,7 +112,7 @@ func NewProxyAuthBasic(p proxyAuthParams) (s proxyAuthServicerBasic, err error) return } -func NewProxyAuthOauth2(p proxyAuthParams, c *http.Client, s types.SecureStorager) (ss proxyAuthServicerOauth2, err error) { +func NewProxyAuthOauth2(p ProxyAuthParams, c *http.Client, s types.SecureStorager) (ss proxyAuthServicerOauth2, err error) { var ( ok bool client, secret, tokenUrl string @@ -149,7 +149,7 @@ func NewProxyAuthOauth2(p proxyAuthParams, c *http.Client, s types.SecureStorage return } -func NewProxyAuthJWT(p proxyAuthParams) (ss proxyAuthServicerJWT, err error) { +func NewProxyAuthJWT(p ProxyAuthParams) (ss proxyAuthServicerJWT, err error) { var ( ok bool jwt string @@ -167,7 +167,7 @@ func NewProxyAuthJWT(p proxyAuthParams) (ss proxyAuthServicerJWT, err error) { return } -func NewProxyAuthServicer(c *http.Client, p proxyAuthParams, s types.SecureStorager) (ProxyAuthServicer, error) { +func NewProxyAuthServicer(c *http.Client, p ProxyAuthParams, s types.SecureStorager) (ProxyAuthServicer, error) { switch p.Type { case proxyAuthTypeHeader: return NewProxyAuthHeader(p) diff --git a/pkg/apigw/filter/proxy/proxy_auth_test.go b/pkg/apigw/filter/proxy/proxy_auth_test.go index cce37cabb8..ce2d0794c3 100644 --- a/pkg/apigw/filter/proxy/proxy_auth_test.go +++ b/pkg/apigw/filter/proxy/proxy_auth_test.go @@ -14,7 +14,7 @@ func Test_authDo(t *testing.T) { name string err string errv string - params proxyAuthParams + params ProxyAuthParams exp http.Header } ) @@ -23,7 +23,7 @@ func Test_authDo(t *testing.T) { tcc = []tf{ { name: "auth header match headers", - params: proxyAuthParams{ + params: ProxyAuthParams{ Type: proxyAuthTypeHeader, Params: map[string]interface{}{ "Client-Id": "123455", @@ -37,7 +37,7 @@ func Test_authDo(t *testing.T) { }, { name: "auth header match canonicalized headers", - params: proxyAuthParams{ + params: ProxyAuthParams{ Type: proxyAuthTypeHeader, Params: map[string]interface{}{ "camelCaseHeader": "123455", @@ -49,7 +49,7 @@ func Test_authDo(t *testing.T) { }, { name: "auth basic match headers", - params: proxyAuthParams{ + params: ProxyAuthParams{ Type: proxyAuthTypeBasic, Params: map[string]interface{}{ "username": "user", @@ -60,7 +60,7 @@ func Test_authDo(t *testing.T) { }, { name: "auth basic match headers fail user validation", - params: proxyAuthParams{ + params: ProxyAuthParams{ Type: proxyAuthTypeBasic, Params: map[string]interface{}{"password": "pass1234"}, }, @@ -69,7 +69,7 @@ func Test_authDo(t *testing.T) { }, { name: "auth basic match headers fail pass validation", - params: proxyAuthParams{ + params: ProxyAuthParams{ Type: proxyAuthTypeBasic, Params: map[string]interface{}{"username": "user"}, }, @@ -78,9 +78,19 @@ func Test_authDo(t *testing.T) { }, { name: "noop default fallback", - params: proxyAuthParams{}, + params: ProxyAuthParams{}, exp: http.Header{}, }, + { + name: "auth JWT token", + params: ProxyAuthParams{ + Type: proxyAuthTypeJWT, + Params: map[string]interface{}{"jwt": "1234"}, + }, + exp: http.Header{ + "Authorization": []string{"Bearer 1234"}, + }, + }, } ) diff --git a/pkg/apigw/filter/proxy/proxy_test.go b/pkg/apigw/filter/proxy/proxy_test.go index 4d800b80a3..eb86baa4ec 100644 --- a/pkg/apigw/filter/proxy/proxy_test.go +++ b/pkg/apigw/filter/proxy/proxy_test.go @@ -11,6 +11,7 @@ import ( "strings" "testing" + agctx "github.com/cortezaproject/corteza-server/pkg/apigw/ctx" "github.com/cortezaproject/corteza-server/pkg/apigw/types" "github.com/cortezaproject/corteza-server/pkg/options" "github.com/stretchr/testify/require" @@ -94,7 +95,7 @@ func Test_proxy(t *testing.T) { { name: "proxy processer params parse error", params: `{"location": "invalid url", "auth": {"type": "header", "params": {}}}`, - err: `could not parse destination location for proxying: parse "invalid url": invalid URI for request`, + err: `could not parse destination location for proxying: (parse "invalid url": invalid URI for request)`, }, { name: "proxy processer params request error", @@ -105,7 +106,7 @@ func Test_proxy(t *testing.T) { } }, params: `{"location": "https://example.com", "auth": {"type": "header", "params": {}}}`, - err: `could not proxy request: Post "https://example.com": error on client.Do`, + err: `could not proxy request: (Post "https://example.com": error on client.Do)`, }, { name: "proxy processer hop headers removed", @@ -159,7 +160,7 @@ func Test_proxy(t *testing.T) { t.Run(tc.name, func(t *testing.T) { var ( - ctx = context.Background() + rc = httptest.NewRecorder() req = require.New(t) c = http.DefaultClient rq = tc.rq @@ -170,26 +171,28 @@ func Test_proxy(t *testing.T) { } if rq == nil { - rq, _ = http.NewRequest("POST", "/foo", strings.NewReader(`custom request body`)) + rq = httptest.NewRequest("POST", "/foo", strings.NewReader(`custom request body`)) } proxy := New(zap.NewNop(), c, struct{}{}) proxy.Merge([]byte(tc.params)) scope := &types.Scp{ - "request": rq, - "writer": httptest.NewRecorder(), - "opts": options.Apigw(), + "opts": options.Apigw(), } - err := proxy.Exec(ctx, scope) + ctx := agctx.ScopeToContext(context.Background(), scope) + rq = rq.WithContext(ctx) + + hn := proxy.Handler() + err := hn(rc, rq) if tc.err != "" { req.EqualError(err, tc.err) } else { req.NoError(err) - req.Equal(tc.exp.Header, scope.Writer().(*httptest.ResponseRecorder).Header()) - req.Equal(tc.exp.Body, scope.Writer().(*httptest.ResponseRecorder).Body) + req.Equal(tc.exp.Header, rc.Header()) + req.Equal(tc.exp.Body, rc.Body) } }) } diff --git a/pkg/apigw/pipeline/pipeline.go b/pkg/apigw/pipeline/pipeline.go index 15db76721e..c89a503017 100644 --- a/pkg/apigw/pipeline/pipeline.go +++ b/pkg/apigw/pipeline/pipeline.go @@ -1,122 +1,107 @@ package pipeline import ( - "context" + "net/http" "sort" "time" "github.com/cortezaproject/corteza-server/pkg/apigw/types" + "github.com/go-chi/chi" "go.uber.org/zap" ) type ( - Worker interface { - types.Execer - types.Stringer - types.Sorter + Worker struct { + Handler func(rw http.ResponseWriter, r *http.Request) error + Weight int + Name string } - workerSet []Worker - - workers struct { - prefilter workerSet - processer workerSet - postfilter workerSet - } + workerSet []*Worker Pl struct { - w workers - err types.ErrorHandler - log *zap.Logger + workers workerSet + err types.ErrorHandlerFunc + log *zap.Logger } ) func NewPipeline(log *zap.Logger) *Pl { + var ( + defaultErrorHandler = types.NewDefaultErrorHandler(log) + ) + return &Pl{ log: log, - w: workers{}, - err: types.DefaultErrorHandler{}, + err: defaultErrorHandler.Handler(), } } -func (pp *Pl) Error() types.ErrorHandler { +func (pp *Pl) Error() types.ErrorHandlerFunc { return pp.err } -// Exec takes care of error handling and main -// functionality that takes place in worker -func (pp *Pl) Exec(ctx context.Context, scope *types.Scp, async bool) (err error) { - err = pp.process(ctx, scope, pp.w.prefilter...) - - if err != nil { - return - } - - if async { - go pp.process(ctx, scope, pp.w.processer...) - } else { - err = pp.process(ctx, scope, pp.w.processer...) - - if err != nil { - return - } - } - - err = pp.process(ctx, scope, pp.w.postfilter...) - - if err != nil { - return - } - - return -} - -// Add registers a new worker with parameters -// fetched from store -func (pp *Pl) Add(w Worker) { - var pipe *workerSet - - switch w.Type() { - case types.PreFilter: - pipe = &pp.w.prefilter - case types.Processer: - pipe = &pp.w.processer - case types.PostFilter: - pipe = &pp.w.postfilter - } - - *pipe = append(*pipe, w) - sort.Sort(pipe) - - pp.log.Debug("registered worker", zap.Any("worker", w.String())) -} - // add error handler -func (pp *Pl) ErrorHandler(ff types.ErrorHandler) { +func (pp *Pl) ErrorHandler(ff types.ErrorHandlerFunc) { pp.err = ff } -func (pp *Pl) process(ctx context.Context, scope *types.Scp, w ...Worker) (err error) { - for _, w := range w { - pp.log.Debug("started worker", zap.Any("worker", w.String())) +// add filter +func (pp *Pl) Add(w *Worker) { + pp.workers = append(pp.workers, w) + sort.Sort(pp.workers) +} - start := time.Now() - err = w.Exec(ctx, scope) - elapsed := time.Since(start) +func (pp *Pl) AddHandler(h http.Handler) {} - pp.log.Debug("finished worker", zap.Any("worker", w.String()), zap.Duration("duration", elapsed)) +func (pp *Pl) Handler() http.Handler { + var ( + middleware []func(http.Handler) http.Handler + ) - if err != nil { - pp.log.Debug("could not execute worker", zap.Error(err)) - return - } + for _, wrker := range pp.workers { + middleware = append(middleware, pp.makeHandler(*wrker)) } - return + return chi.Chain(middleware...).Handler(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {})) +} + +func (pp *Pl) makeHandler(hh Worker) func(next http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + var ( + start = time.Now() + ) + + pp.log.Debug("started processing", zap.String("filter", hh.Name)) + + // if w.async { + // ctx = context.Background() + // r.WithContext(context.Background()) + // go w.handler(rw, r) + // next.ServeHTTP(rw, r) + // } else { + + err := hh.Handler(rw, r) + + pp.log.Debug("finished processing", + zap.String("filter", hh.Name), + zap.Duration("duration", time.Since(start))) + + if err != nil { + pp.err(rw, r, err) + return + } else { + next.ServeHTTP(rw, r) + } + // } + + }) + } } func (a workerSet) Len() int { return len(a) } func (a workerSet) Less(i, j int) bool { - return a[i].Weight() < a[j].Weight() + return a[i].Weight < a[j].Weight } func (a workerSet) Swap(i, j int) { a[i], a[j] = a[j], a[i] } diff --git a/pkg/apigw/pipeline/pipeline_test.go b/pkg/apigw/pipeline/pipeline_test.go index c68be7c439..4561c6f582 100644 --- a/pkg/apigw/pipeline/pipeline_test.go +++ b/pkg/apigw/pipeline/pipeline_test.go @@ -1,8 +1,9 @@ package pipeline import ( - "context" - "fmt" + "errors" + "net/http" + "net/http/httptest" "testing" "github.com/cortezaproject/corteza-server/pkg/apigw/types" @@ -10,6 +11,10 @@ import ( "go.uber.org/zap" ) +var ( + mockEmptyHandler = func(rw http.ResponseWriter, r *http.Request) (err error) { return } +) + func NewPl() *Pl { return NewPipeline(zap.NewNop()) } @@ -20,51 +25,101 @@ func Test_pipelineAdd(t *testing.T) { ) p := NewPl() - p.Add(types.MockExecer{}) + p.Add(&Worker{ + Handler: mockEmptyHandler, + Weight: 0, + Name: "mockWorker", + }) - req.Len(p.w.prefilter, 1) + req.Len(p.workers, 1) } -func Test_pipelineExec(t *testing.T) { +func Test_pipelineHandleMultiple(t *testing.T) { var ( - ctx = context.Background() - req = require.New(t) - scope = &types.Scp{"foo": 1} + req = require.New(t) + rr = httptest.NewRecorder() + p = NewPl() + + first = types.MockHandler{ + Handler_: func(rw http.ResponseWriter, r *http.Request) error { + rw.Write([]byte(`first`)) + return nil + }, + } + + second = types.MockHandler{ + Handler_: func(rw http.ResponseWriter, r *http.Request) error { + rw.Write([]byte(`second`)) + return nil + }, + } ) - p := NewPl() - p.Add(types.MockExecer{ - Exec_: func(c context.Context, s *types.Scp) (err error) { - s.Set("foo", 2) - return nil - }, + p.Add(&Worker{ + Handler: first.Handler(), + Weight: 5, + Name: "mockHandler", }) - err := p.Exec(ctx, scope, false) - - req.NoError(err) + p.Add(&Worker{ + Handler: second.Handler(), + Weight: 0, + Name: "mockHandler", + }) - foo, err := scope.Get("foo") + p.Handler().ServeHTTP(rr, &http.Request{}) - req.NoError(err) - req.Equal(2, foo) + req.Equal(`secondfirst`, rr.Body.String()) } func Test_pipelineExecErr(t *testing.T) { + type ( + tf struct { + name string + mh types.MockHandler + w *Worker + wgt int + exp string + } + ) + var ( - ctx = context.Background() - req = require.New(t) - scope = &types.Scp{"foo": 1} + tcc = []tf{ + { + name: "matching simple", + mh: types.MockHandler{ + Handler_: func(rw http.ResponseWriter, r *http.Request) error { + return errors.New("triggered") + }}, + w: &Worker{}, + exp: `{"error":{"message":"triggered"}}` + "\n", + }, + { + name: "matching simple", + mh: types.MockHandler{ + Handler_: func(rw http.ResponseWriter, r *http.Request) error { + rw.Write([]byte(`foobar`)) + return nil + }}, + w: &Worker{}, + exp: `foobar`, + }, + } ) - p := NewPl() - p.Add(types.MockExecer{ - Exec_: func(c context.Context, s *types.Scp) (err error) { - return fmt.Errorf("error returned") - }, - }) + for _, tc := range tcc { + var ( + p = NewPl() + rr = httptest.NewRecorder() + req = require.New(t) + ) + + tc.w.Handler = tc.mh.Handler() + + p.Add(tc.w) + p.Handler().ServeHTTP(rr, &http.Request{}) - err := p.Exec(ctx, scope, false) + req.Equal(tc.exp, rr.Body.String()) + } - req.Error(err, "error returned") } diff --git a/pkg/apigw/registry/registry.go b/pkg/apigw/registry/registry.go index b2eeaad35b..a3d1739f73 100644 --- a/pkg/apigw/registry/registry.go +++ b/pkg/apigw/registry/registry.go @@ -60,7 +60,6 @@ func (r *Registry) All() (list types.FilterMetaList) { func (r *Registry) Preload() { // prefilters r.Add("queryParam", filter.NewQueryParam()) - r.Add("origin", filter.NewOrigin()) r.Add("header", filter.NewHeader()) // processers @@ -73,6 +72,6 @@ func (r *Registry) Preload() { r.Add("defaultJsonResponse", filter.NewDefaultJsonResponse()) } -func NewWorkflow() (wf types.WfExecer) { +func NewWorkflow() (wf filter.WfExecer) { return service.Workflow(&zap.Logger{}, options.CorredorOpt{}) } diff --git a/pkg/apigw/route.go b/pkg/apigw/route.go index 46471d44e0..c89534b51b 100644 --- a/pkg/apigw/route.go +++ b/pkg/apigw/route.go @@ -9,7 +9,7 @@ import ( "net/http/httputil" "time" - "github.com/cortezaproject/corteza-server/pkg/apigw/pipeline" + actx "github.com/cortezaproject/corteza-server/pkg/apigw/ctx" "github.com/cortezaproject/corteza-server/pkg/apigw/types" "github.com/cortezaproject/corteza-server/pkg/auth" "github.com/cortezaproject/corteza-server/pkg/options" @@ -25,7 +25,9 @@ type ( opts *options.ApigwOpt log *zap.Logger - pipe *pipeline.Pl + + handler http.Handler + errHandler types.ErrorHandlerFunc } routeMeta struct { @@ -38,43 +40,38 @@ func (r route) ServeHTTP(w http.ResponseWriter, req *http.Request) { var ( ctx = auth.SetIdentityToContext(req.Context(), auth.ServiceUser()) scope = types.Scp{} + start = time.Now() ) + r.log.Debug("started serving route") + b, _ := io.ReadAll(req.Body) body := string(b) // write again req.Body = ioutil.NopCloser(bytes.NewBuffer(b)) - scope.Set("request", req) - scope.Set("writer", w) scope.Set("opts", r.opts) scope.Set("payload", body) if err := r.validate(req); err != nil { r.log.Debug("error validating request on route", zap.Error(err)) - r.pipe.Error().Exec(ctx, &scope, fmt.Errorf("could not validate request: %s", err)) + r.errHandler(w, req, err) return } if r.opts.LogEnabled { - o, _ := httputil.DumpRequest(req, false) + o, _ := httputil.DumpRequest(req, r.opts.LogRequestBody) r.log.Debug("incoming request", zap.Any("request", string(o))) } - start := time.Now() - - err := r.pipe.Exec(ctx, &scope, r.meta.async) + req = req.WithContext(actx.ScopeToContext(ctx, &scope)) - elapsed := time.Since(start) + r.handler.ServeHTTP(w, req) - if err != nil { - // call the error handler - r.log.Debug("calling default error handler on error") - r.pipe.Error().Exec(ctx, &scope, err) - } - - r.log.Debug("finished serving route", zap.String("route", r.String()), zap.Duration("duration", elapsed)) + r.log.Debug("finished serving route", + zap.Duration("duration", time.Since(start)), + ) } func (r route) validate(req *http.Request) (err error) { diff --git a/pkg/apigw/route_test.go b/pkg/apigw/route_test.go index 90da3d3f5b..f803c1affb 100644 --- a/pkg/apigw/route_test.go +++ b/pkg/apigw/route_test.go @@ -1,7 +1,6 @@ package apigw import ( - "context" "errors" "net/http" "net/http/httptest" @@ -18,68 +17,68 @@ func Test_pl(t *testing.T) { type ( tf struct { name string - handler pipeline.Worker method string - errHandler types.ErrorHandler + expError string expStatus int - expError error + handler *types.MockHandler + errHandler *types.MockErrorHandler } ) var ( tcc = []tf{ { - name: "successful exec", - handler: types.MockExecer{ - Exec_: func(c context.Context, s *types.Scp) (err error) { - s.Writer().WriteHeader(http.StatusTemporaryRedirect) - return - }, - }, - errHandler: types.MockErrorExecer{ - Exec_: func(c context.Context, s *types.Scp, e error) { - s.Writer().Write([]byte(e.Error())) + name: "successful handler", + handler: &types.MockHandler{ + Handler_: func(rw http.ResponseWriter, r *http.Request) error { + rw.WriteHeader(http.StatusTemporaryRedirect) + return nil }, }, method: "POST", expStatus: http.StatusTemporaryRedirect, - expError: nil, + expError: "", }, { - name: "unsuccessful exec", - handler: types.MockExecer{ - Exec_: func(c context.Context, s *types.Scp) (err error) { - s.Writer().WriteHeader(http.StatusTemporaryRedirect) + name: "unsuccessful handle custom error response", + handler: &types.MockHandler{ + Handler_: func(rw http.ResponseWriter, r *http.Request) error { + rw.WriteHeader(http.StatusTemporaryRedirect) return errors.New("test error") }, }, - errHandler: types.MockErrorExecer{ - Exec_: func(c context.Context, s *types.Scp, e error) { - s.Writer().WriteHeader(http.StatusInternalServerError) - s.Writer().Write([]byte(e.Error())) + errHandler: &types.MockErrorHandler{ + Handler_: func(rw http.ResponseWriter, r *http.Request, err error) { + rw.Write([]byte("custom error response: " + err.Error())) }, }, method: "POST", expStatus: http.StatusTemporaryRedirect, - expError: errors.New("test error"), + expError: "custom error response: test error", }, { - name: "request method validation fail", - handler: types.MockExecer{ - Exec_: func(c context.Context, s *types.Scp) (err error) { - s.Writer().WriteHeader(http.StatusTemporaryRedirect) + name: "unsuccessful handle default error response", + handler: &types.MockHandler{ + Handler_: func(rw http.ResponseWriter, r *http.Request) error { + rw.WriteHeader(http.StatusTemporaryRedirect) return errors.New("test error") }, }, - errHandler: types.MockErrorExecer{ - Exec_: func(c context.Context, s *types.Scp, e error) { - s.Writer().WriteHeader(http.StatusInternalServerError) - s.Writer().Write([]byte(e.Error())) + method: "POST", + expStatus: http.StatusTemporaryRedirect, + expError: "{\"error\":{\"message\":\"test error\"}}\n", + }, + { + name: "request method validation fail", + handler: &types.MockHandler{ + Handler_: func(rw http.ResponseWriter, r *http.Request) error { + rw.WriteHeader(http.StatusTemporaryRedirect) + return errors.New("test error") }, }, method: "GET", expStatus: http.StatusInternalServerError, - expError: errors.New("could not validate request: invalid method POST"), + expError: "{\"error\":{\"message\":\"invalid method POST\"}}\n", }, } ) @@ -92,28 +91,28 @@ func Test_pl(t *testing.T) { pipe = pipeline.NewPipeline(zap.NewNop()) ) - r, err := http.NewRequest("POST", "/foo", http.NoBody) - req.NoError(err) + r := httptest.NewRequest("POST", "/foo", http.NoBody) + + pipe.Add(&pipeline.Worker{ + Handler: tc.handler.Handler(), + }) - pipe.Add(tc.handler) - pipe.ErrorHandler(tc.errHandler) + if tc.errHandler != nil { + pipe.ErrorHandler(tc.errHandler.Handler()) + } route := &route{ - method: tc.method, - pipe: pipe, - log: zap.NewNop(), - opts: options.Apigw(), + method: tc.method, + log: zap.NewNop(), + opts: options.Apigw(), + handler: pipe.Handler(), + errHandler: pipe.Error(), } route.ServeHTTP(rr, r) - expError := "" - if tc.expError != nil { - expError = tc.expError.Error() - } - req.Equal(tc.expStatus, rr.Result().StatusCode) - req.Equal(expError, rr.Body.String()) + req.Equal(tc.expError, rr.Body.String()) }) } } diff --git a/pkg/apigw/service.go b/pkg/apigw/service.go index 3f53ee9767..9f43bc8d24 100644 --- a/pkg/apigw/service.go +++ b/pkg/apigw/service.go @@ -3,6 +3,8 @@ package apigw import ( "context" "encoding/json" + "fmt" + "math" "github.com/cortezaproject/corteza-server/pkg/apigw/filter" "github.com/cortezaproject/corteza-server/pkg/apigw/filter/proxy" @@ -62,8 +64,8 @@ func New(opts *options.ApigwOpt, logger *zap.Logger, storer storer) *apigw { opts: opts, log: logger, storer: storer, - reload: make(chan bool), reg: reg, + reload: make(chan bool), } } @@ -167,63 +169,91 @@ func (s *apigw) Init(ctx context.Context, route ...*route) { } for _, r := range s.routes { - hasPostFilters = false - log := s.log.With(zap.String("route", r.String())) + var ( + log = s.log.With(zap.String("route", r.String())) + pipe = pipeline.NewPipeline(log) + ) - r.pipe = pipeline.NewPipeline(log) + hasPostFilters = false r.opts = s.opts r.log = log regFilters, err := s.loadFilters(ctx, r.ID) if err != nil { - log.Error("could not load functions for route", zap.Error(err)) + log.Error("could not load filters for route", zap.Error(err)) continue } - r.pipe.ErrorHandler(filter.NewErrorHandler("error handler expediter", []string{})) - for _, f := range regFilters { - h, err := s.reg.Get(f.Ref) - - if err != nil { - log.Error("could not register filter", zap.Error(err)) - continue - } + flog := log.With(zap.String("ref", f.Ref)) - enc, err := json.Marshal(f.Params) + ff, err := s.registerFilter(f, r) if err != nil { - log.Error("could not load params for filter", zap.String("ref", f.Ref), zap.Error(err)) + flog.Error("could not register filter", zap.Error(err)) continue } - h, err = s.reg.Merge(h, enc) - - if err != nil { - log.Error("could not merge params to handler", zap.String("ref", f.Ref), zap.Error(err)) - continue - } + pipe.Add(ff) // check if it's a postfilter for async support if f.Kind == string(types.PostFilter) { hasPostFilters = true } - r.pipe.Add(h) + flog.Debug("registered filter") } + r.handler = pipe.Handler() + r.errHandler = pipe.Error() + // add default postfilter on async // routes if not present if r.meta.async && !hasPostFilters { log.Info("registering default postfilter", zap.Error(err)) - r.pipe.Add(defaultPostFilter) + + pipe.Add(&pipeline.Worker{ + Handler: defaultPostFilter.Handler(), + Name: defaultPostFilter.String(), + Weight: math.MaxInt8, + }) } log.Debug("successfuly registered route") } } +func (s *apigw) registerFilter(f *st.ApigwFilter, r *route) (ff *pipeline.Worker, err error) { + handler, err := s.reg.Get(f.Ref) + + if err != nil { + return + } + + enc, err := json.Marshal(f.Params) + + if err != nil { + err = fmt.Errorf("could not load params for filter: %s", err) + return + } + + handler, err = s.reg.Merge(handler, enc) + + if err != nil { + err = fmt.Errorf("could not merge params to handler: %s", err) + return + } + + ff = &pipeline.Worker{ + Handler: handler.Handler(), + Name: handler.String(), + Weight: filter.FilterWeight(int(f.Weight), types.FilterKind(f.Kind)), + } + + return +} + func (s *apigw) Funcs(kind string) (list types.FilterMetaList) { list = s.reg.All() diff --git a/pkg/apigw/service_test.go b/pkg/apigw/service_test.go index 14933549cf..fe64da38b1 100644 --- a/pkg/apigw/service_test.go +++ b/pkg/apigw/service_test.go @@ -2,15 +2,18 @@ package apigw import ( "context" + "errors" "testing" + "github.com/cortezaproject/corteza-server/pkg/apigw/registry" "github.com/cortezaproject/corteza-server/pkg/apigw/types" st "github.com/cortezaproject/corteza-server/system/types" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) type ( - // overriding mockHandler with only + // overriding types.MockHandler with only // the merge function mockExistingHandler struct { *types.MockHandler @@ -70,116 +73,113 @@ func Test_serviceLoadFunctions(t *testing.T) { req.Len(r, 2) } -// func Test_serviceInit(t *testing.T) { -// type ( -// tf struct { -// name string -// expLen int -// st types.MockStorer -// reg *registry.Registry -// } -// ) - -// var ( -// tcc = []tf{ -// { -// name: "could not register 1 function for route", -// st: types.MockStorer{ -// r: func(c context.Context, arf st.ApigwRouteFilter) (s st.ApigwRouteSet, f st.ApigwRouteFilter, err error) { -// s = st.ApigwRouteSet{ -// {ID: 1, Endpoint: "/endpoint", Method: "GET", Debug: false, Enabled: true, Group: 0}, -// } -// return -// }, -// F: func(c context.Context, aff st.ApigwFilterFilter) (s st.ApigwFilterSet, f st.ApigwFilterFilter, err error) { -// s = st.ApigwFilterSet{ -// {ID: 1, Route: 1, Ref: "testExistingFunction"}, -// {ID: 2, Route: 1, Ref: "testNotExistingFunction"}, -// } -// return -// }, -// }, -// reg: ®istry{ -// h: map[string]types.Handler{"testExistingFunction": &mockHandler{}}, -// }, -// expLen: 1, -// }, -// { -// name: "successful register of 2 functions for route", -// st: types.MockStorer{ -// r: func(c context.Context, arf st.ApigwRouteFilter) (s st.ApigwRouteSet, f st.ApigwRouteFilter, err error) { -// s = st.ApigwRouteSet{ -// {ID: 1, Endpoint: "/endpoint", Method: "GET", Debug: false, Enabled: true, Group: 0}, -// } -// return -// }, -// F: func(c context.Context, aff st.ApigwFilterFilter) (s st.ApigwFilterSet, f st.ApigwFilterFilter, err error) { -// s = st.ApigwFilterSet{ -// {ID: 1, Route: 1, Ref: "testExistingFunction"}, -// {ID: 2, Route: 1, Ref: "testExistingFunction"}, -// } -// return -// }, -// }, -// reg: ®istry{ -// h: map[string]types.Handler{"testExistingFunction": &mockHandler{}}, -// }, -// expLen: 2, -// }, -// { -// name: "could not merge params for function", -// st: types.MockStorer{ -// r: func(c context.Context, arf st.ApigwRouteFilter) (s st.ApigwRouteSet, f st.ApigwRouteFilter, err error) { -// s = st.ApigwRouteSet{ -// {ID: 1, Endpoint: "/endpoint", Method: "GET", Debug: false, Enabled: true, Group: 0}, -// } -// return -// }, -// F: func(c context.Context, aff st.ApigwFilterFilter) (s st.ApigwFilterSet, f st.ApigwFilterFilter, err error) { -// s = st.ApigwFilterSet{ -// {ID: 1, Route: 1, Ref: "testExistingFunction", Params: st.ApigwFilterParams{}}, -// } -// return -// }, -// }, -// reg: ®istry.Registry{ -// h: map[string]types.Handler{ -// "testExistingFunction": &mockExistingHandler{ -// merge: func(params []byte) (types.Handler, error) { -// return nil, errors.New("testttt") -// }, -// }, -// }, -// }, -// expLen: 0, -// }, -// } -// ) - -// for _, tc := range tcc { -// t.Run(tc.name, func(t *testing.T) { -// var ( -// req = require.New(t) -// ctx = context.Background() -// ) - -// service := &apigw{ -// log: zap.NewNop(), -// storer: tc.st, -// reg: tc.reg, -// } - -// rr, err := service.loadRoutes(ctx) -// req.NoError(err) - -// service.Init(ctx, rr...) - -// req.NotEmpty(service.routes) -// req.Len(service.routes[0].pipe.w, tc.expLen) -// }) -// } - -// } +func Test_serviceInit(t *testing.T) { + type ( + tf struct { + name string + expLen int + st types.MockStorer + reg map[string]types.Handler + } + ) + + var ( + tcc = []tf{ + { + name: "could not register 1 function for route", + st: types.MockStorer{ + R: func(c context.Context, arf st.ApigwRouteFilter) (s st.ApigwRouteSet, f st.ApigwRouteFilter, err error) { + s = st.ApigwRouteSet{ + {ID: 1, Endpoint: "/endpoint", Method: "GET", Enabled: true, Group: 0}, + } + return + }, + F: func(c context.Context, aff st.ApigwFilterFilter) (s st.ApigwFilterSet, f st.ApigwFilterFilter, err error) { + s = st.ApigwFilterSet{ + {ID: 1, Route: 1, Ref: "testExistingFilter"}, + {ID: 2, Route: 1, Ref: "testNotExistingFunction"}, + } + return + }, + }, + reg: map[string]types.Handler{"testExistingFilter": &types.MockHandler{}}, + expLen: 1, + }, + { + name: "successful register of 2 functions for route", + st: types.MockStorer{ + R: func(c context.Context, arf st.ApigwRouteFilter) (s st.ApigwRouteSet, f st.ApigwRouteFilter, err error) { + s = st.ApigwRouteSet{ + {ID: 1, Endpoint: "/endpoint", Method: "GET", Enabled: true, Group: 0}, + } + return + }, + F: func(c context.Context, aff st.ApigwFilterFilter) (s st.ApigwFilterSet, f st.ApigwFilterFilter, err error) { + s = st.ApigwFilterSet{ + {ID: 1, Route: 1, Ref: "testExistingFilter"}, + {ID: 2, Route: 1, Ref: "testExistingFilter"}, + } + return + }, + }, + reg: map[string]types.Handler{"testExistingFilter": &types.MockHandler{}}, + expLen: 2, + }, + { + name: "could not merge params for function", + st: types.MockStorer{ + R: func(c context.Context, arf st.ApigwRouteFilter) (s st.ApigwRouteSet, f st.ApigwRouteFilter, err error) { + s = st.ApigwRouteSet{ + {ID: 1, Endpoint: "/endpoint", Method: "GET", Enabled: true, Group: 0}, + } + return + }, + F: func(c context.Context, aff st.ApigwFilterFilter) (s st.ApigwFilterSet, f st.ApigwFilterFilter, err error) { + s = st.ApigwFilterSet{ + {ID: 1, Route: 1, Ref: "testExistingFilter", Params: st.ApigwFilterParams{}}, + } + return + }, + }, + reg: map[string]types.Handler{"testExistingFilter": &mockExistingHandler{ + merge: func(params []byte) (types.Handler, error) { + return nil, errors.New("testttt") + }, + }}, + expLen: 0, + }, + } + ) + + for _, tc := range tcc { + t.Run(tc.name, func(t *testing.T) { + var ( + req = require.New(t) + ctx = context.Background() + ) + + reg := registry.NewRegistry() + + for hn, h := range tc.reg { + reg.Add(hn, h) + } + + service := &apigw{ + log: zap.NewNop(), + storer: tc.st, + reg: reg, + } + + rr, err := service.loadRoutes(ctx) + req.NoError(err) + + service.Init(ctx, rr...) + + req.NotEmpty(service.routes) + }) + } + +} func (h mockExistingHandler) Merge(params []byte) (types.Handler, error) { return h.merge(params) diff --git a/pkg/apigw/types/error.go b/pkg/apigw/types/error.go index 477c9a297a..99f0020494 100644 --- a/pkg/apigw/types/error.go +++ b/pkg/apigw/types/error.go @@ -1,18 +1,27 @@ package types import ( - "context" "net/http" + + "github.com/cortezaproject/corteza-server/pkg/errors" + "go.uber.org/zap" ) type ( - DefaultErrorHandler struct{} + DefaultErrorHandler struct { + log *zap.Logger + } ) -func (h DefaultErrorHandler) Exec(ctx context.Context, scope *Scp, err error) { - // set http status code - scope.Writer().WriteHeader(http.StatusInternalServerError) +func NewDefaultErrorHandler(log *zap.Logger) DefaultErrorHandler { + return DefaultErrorHandler{ + log: log, + } +} - // set body - scope.Writer().Write([]byte(err.Error())) +func (h DefaultErrorHandler) Handler() ErrorHandlerFunc { + return func(rw http.ResponseWriter, r *http.Request, err error) { + errors.ProperlyServeHTTP(rw, r, err, true) + h.log.Error(err.Error()) + } } diff --git a/pkg/apigw/types/handler.go b/pkg/apigw/types/handler.go index 00419b939a..b1a9276395 100644 --- a/pkg/apigw/types/handler.go +++ b/pkg/apigw/types/handler.go @@ -1,40 +1,30 @@ package types import ( - "context" - - atypes "github.com/cortezaproject/corteza-server/automation/types" - "github.com/cortezaproject/corteza-server/pkg/expr" + "net/http" ) type ( - Execer interface { - Exec(context.Context, *Scp) error - Type() FilterKind - } - - Sorter interface { - Weight() int - } - - ErrorHandler interface { - Exec(context.Context, *Scp, error) - } - Stringer interface { String() string } - WfExecer interface { - Exec(ctx context.Context, workflowID uint64, p atypes.WorkflowExecParams) (*expr.Vars, atypes.Stacktrace, error) + HTTPHandler interface { + Handler() HandlerFunc + } + + HTTPErrorHandler interface { + Handler() ErrorHandlerFunc } Handler interface { - Execer + HTTPHandler Stringer - Sorter Merge([]byte) (Handler, error) Meta() FilterMeta } + + HandlerFunc func(rw http.ResponseWriter, r *http.Request) error + ErrorHandlerFunc func(rw http.ResponseWriter, r *http.Request, err error) ) diff --git a/pkg/apigw/types/test.go b/pkg/apigw/types/test.go index ec0da46ab3..b4c812cd74 100644 --- a/pkg/apigw/types/test.go +++ b/pkg/apigw/types/test.go @@ -15,12 +15,13 @@ type ( Type_ func() FilterKind } - MockErrorExecer struct { - Exec_ func(context.Context, *Scp, error) + MockErrorHandler struct { + Handler_ ErrorHandlerFunc } MockHandler struct { - Foo string `json:"foo"` + Foo string `json:"foo"` + Handler_ HandlerFunc } MockStorer struct { @@ -35,16 +36,8 @@ func (h MockHandler) String() string { return "MockHandler" } -func (h MockHandler) Type() FilterKind { - return PreFilter -} - -func (h MockHandler) Weight() int { - return 0 -} - -func (h MockHandler) Exec(_ context.Context, _ *Scp) error { - panic("not implemented") // TODO: Implement +func (h MockHandler) Handler() HandlerFunc { + return h.Handler_ } func (h MockHandler) Merge(params []byte) (Handler, error) { @@ -66,24 +59,8 @@ func (td MockStorer) SearchApigwFilters(ctx context.Context, f st.ApigwFilterFil return td.F(ctx, f) } -func (me MockExecer) String() string { - return "MockExecer" -} - -func (h MockExecer) Type() FilterKind { - return PreFilter -} - -func (h MockExecer) Weight() int { - return 0 -} - -func (me MockExecer) Exec(ctx context.Context, s *Scp) (err error) { - return me.Exec_(ctx, s) -} - -func (me MockErrorExecer) Exec(ctx context.Context, s *Scp, e error) { - me.Exec_(ctx, s, e) +func (h MockErrorHandler) Handler() ErrorHandlerFunc { + return h.Handler_ } func (mrt MockRoundTripper) RoundTrip(rq *http.Request) (r *http.Response, err error) { diff --git a/pkg/expr/expr.go b/pkg/expr/expr.go index 4dea7617f5..20399ef727 100644 --- a/pkg/expr/expr.go +++ b/pkg/expr/expr.go @@ -46,7 +46,7 @@ func pathSplitter(data []byte, atEOF bool) (advance int, token []byte, err error return i + 1, data[start:i], nil } - if data[i+1] != '.' { + if data[i+1] != '.' && data[i+1] != '[' { return 0, nil, invalidPathErr } diff --git a/pkg/expr/expr_test.go b/pkg/expr/expr_test.go index 29ceecda4b..ddb3d9a1fa 100644 --- a/pkg/expr/expr_test.go +++ b/pkg/expr/expr_test.go @@ -19,6 +19,8 @@ func TestPathSplit(t *testing.T) { {p: "a.[]", err: invalidPathErr.Error()}, {p: "a[]", r: []string{"a", ""}}, {p: "a[1]bzz", err: invalidPathErr.Error()}, + {p: "a[b][c].d[1]", r: []string{"a", "b", "c", "d", "1"}}, + {p: "a.Content-Type", r: []string{"a", "Content-Type"}}, } for _, tc := range tcc { diff --git a/pkg/expr/expr_types_test.go b/pkg/expr/expr_types_test.go index e386fbb199..f42ae7de7f 100644 --- a/pkg/expr/expr_types_test.go +++ b/pkg/expr/expr_types_test.go @@ -353,13 +353,14 @@ func TestKVV_Assign(t *testing.T) { // Making sure http.Header is properly converted kvv = KVV{} - req.NoError(kvv.Assign(http.Header{"foo": []string{"bar"}})) - req.Contains(kvv.value, "foo") - req.Equal([]string{"bar"}, kvv.value["foo"]) + req.NoError(kvv.Assign(http.Header{"foo-bar": []string{"bar"}})) + req.Contains(kvv.value, "foo-bar") + req.Equal([]string{"bar"}, kvv.value["foo-bar"]) // Making sure url.Values are properly converted kvv = KVV{} req.NoError(kvv.Assign(url.Values{"foo": []string{"bar"}})) + req.Contains(kvv.value, "foo") req.Equal([]string{"bar"}, kvv.value["foo"]) @@ -372,7 +373,6 @@ func TestKVV_Assign(t *testing.T) { req.NoError(Assign(&kvv, "deep[3]", Must(NewString("b4z")))) req.Contains(kvv.value, "deep") req.Equal([]string{"bar", "baz", "bar", "b4z"}, kvv.value["deep"]) - } func TestKVV_Set(t *testing.T) { diff --git a/pkg/options/apigw.gen.go b/pkg/options/apigw.gen.go index 5be616becc..3b9592318e 100644 --- a/pkg/options/apigw.gen.go +++ b/pkg/options/apigw.gen.go @@ -16,6 +16,7 @@ type ( ApigwOpt struct { Enabled bool `env:"APIGW_ENABLED"` LogEnabled bool `env:"APIGW_LOG_ENABLED"` + LogRequestBody bool `env:"APIGW_LOG_REQUEST_BODY"` ProxyEnableDebugLog bool `env:"APIGW_PROXY_ENABLE_DEBUG_LOG"` ProxyFollowRedirects bool `env:"APIGW_PROXY_FOLLOW_REDIRECTS"` ProxyOutboundTimeout time.Duration `env:"APIGW_PROXY_OUTBOUND_TIMEOUT"` @@ -27,6 +28,7 @@ func Apigw() (o *ApigwOpt) { o = &ApigwOpt{ Enabled: true, LogEnabled: false, + LogRequestBody: false, ProxyEnableDebugLog: false, ProxyFollowRedirects: true, ProxyOutboundTimeout: time.Second * 30, diff --git a/pkg/options/apigw.yaml b/pkg/options/apigw.yaml index 6d047e0d16..2e08937b7a 100644 --- a/pkg/options/apigw.yaml +++ b/pkg/options/apigw.yaml @@ -17,6 +17,12 @@ props: description: |- Enable extra logging + - name: logRequestBody + type: bool + default: false + description: |- + Enable incoming request body output in logs + - name: proxyEnableDebugLog type: bool default: false