Skip to content

Commit

Permalink
Fix Peter's and Josh's comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
  • Loading branch information
codesome committed Feb 17, 2021
1 parent 92657d3 commit 6a39e95
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 124 deletions.
37 changes: 25 additions & 12 deletions pkg/alertmanager/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/client"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
)

// Distributor forwards requests to individual alertmanagers.
type Distributor struct {
services.Service

cfg *MultitenantAlertmanagerConfig
cfg ClientConfig
maxRecvMsgSize int64
requestsInFlight sync.WaitGroup

alertmanagerRing ring.ReadRing
Expand All @@ -39,14 +40,15 @@ type Distributor struct {
}

// NewDistributor constructs a new Distributor
func NewDistributor(cfg *MultitenantAlertmanagerConfig, alertmanagersRing *ring.Ring, alertmanagerClientsPool ClientsPool, logger log.Logger, reg prometheus.Registerer) (d *Distributor, err error) {
func NewDistributor(cfg ClientConfig, maxRecvMsgSize int64, alertmanagersRing *ring.Ring, alertmanagerClientsPool ClientsPool, logger log.Logger, reg prometheus.Registerer) (d *Distributor, err error) {
if alertmanagerClientsPool == nil {
alertmanagerClientsPool = newAlertmanagerClientsPool(client.NewRingServiceDiscovery(alertmanagersRing), cfg.AlertmanagerClient, logger, reg)
alertmanagerClientsPool = newAlertmanagerClientsPool(client.NewRingServiceDiscovery(alertmanagersRing), cfg, logger, reg)
}

d = &Distributor{
cfg: cfg,
logger: logger,
maxRecvMsgSize: maxRecvMsgSize,
alertmanagerRing: alertmanagersRing,
alertmanagerClientsPool: alertmanagerClientsPool,
}
Expand All @@ -70,7 +72,7 @@ func (d *Distributor) IsPathSupported(path string) bool {

// DistributeRequest shards the writes and returns as soon as the quorum is satisfied.
// In case of reads, it proxies the request to one of the alertmanagers.
// DistributeRequest assumes that the caller has verified IsPathSupported return
// DistributeRequest assumes that the caller has verified IsPathSupported returns
// true for the route.
func (d *Distributor) DistributeRequest(w http.ResponseWriter, r *http.Request) {
d.requestsInFlight.Add(1)
Expand All @@ -82,7 +84,7 @@ func (d *Distributor) DistributeRequest(w http.ResponseWriter, r *http.Request)
return
}

logger := log.With(d.logger, "user", userID)
logger := util_log.WithContext(r.Context(), d.logger)

if r.Method == http.MethodGet || r.Method == http.MethodHead {
d.doRead(userID, w, r, logger)
Expand All @@ -95,10 +97,11 @@ func (d *Distributor) doWrite(userID string, w http.ResponseWriter, r *http.Requ
var body []byte
var err error
if r.Body != nil {
body, err = ioutil.ReadAll(r.Body)
body, err = ioutil.ReadAll(http.MaxBytesReader(w, r.Body, d.maxRecvMsgSize))
if err != nil {
if errors.Is(util.ErrRequestBodyTooLarge, err) {
if util.IsRequestBodyTooLarge(err) {
http.Error(w, "Request body too large", http.StatusRequestEntityTooLarge)
return
}
level.Error(logger).Log("msg", "failed to read the request body during write", "err", err)
w.WriteHeader(http.StatusInternalServerError)
Expand Down Expand Up @@ -150,7 +153,7 @@ func (d *Distributor) doWrite(userID string, w http.ResponseWriter, r *http.Requ
respondFromHTTPGRPCResponse(w, resp)
} else {
// This should not happen.
level.Error(logger).Log("msg", "distributor did not receive response from alertmanager though no errors")
level.Error(logger).Log("msg", "distributor did not receive any response from alertmanagers, but there were no errors")
w.WriteHeader(http.StatusInternalServerError)
}
}
Expand All @@ -164,12 +167,22 @@ func (d *Distributor) doRead(userID string, w http.ResponseWriter, r *http.Reque
return
}

req, err := server.HTTPRequest(r)
body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, d.maxRecvMsgSize))
if err != nil {
level.Error(logger).Log("msg", "failed to get grpc request from http request", "err", err)
if util.IsRequestBodyTooLarge(err) {
http.Error(w, "Request body too large", http.StatusRequestEntityTooLarge)
return
}
level.Error(logger).Log("msg", "failed to read the request body during read", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
req := &httpgrpc.HTTPRequest{
Method: r.Method,
Url: r.RequestURI,
Body: body,
Headers: httpToHttpgrpcHeaders(r.Header),
}

sp, ctx := opentracing.StartSpanFromContext(r.Context(), "Distributor.doRead")
defer sp.Finish()
Expand Down Expand Up @@ -206,7 +219,7 @@ func respondFromHTTPGRPCResponse(w http.ResponseWriter, httpResp *httpgrpc.HTTPR
}

func (d *Distributor) doRequest(ctx context.Context, am ring.InstanceDesc, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
ctx, cancel := context.WithTimeout(ctx, d.cfg.AlertmanagerClient.RemoteTimeout)
ctx, cancel := context.WithTimeout(ctx, d.cfg.RemoteTimeout)
defer cancel()
amClient, err := d.alertmanagerClientsPool.GetClientFor(am.Addr)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/alertmanager/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func prepare(t *testing.T, numAM, numHappyAM, replicationFactor int) (*Distribut
cfg := &MultitenantAlertmanagerConfig{}
flagext.DefaultValues(cfg)

d, err := NewDistributor(cfg, amRing, newMockAlertmanagerClientFactory(amByAddr), util_log.Logger, prometheus.NewRegistry())
d, err := NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, amRing, newMockAlertmanagerClientFactory(amByAddr), util_log.Logger, prometheus.NewRegistry())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))

Expand Down
2 changes: 1 addition & 1 deletion pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func createMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, fallbackC

am.grpcServer = server.NewServer(&handlerForGRPCServer{am: am})

am.distributor, err = NewDistributor(cfg, am.ring, nil, log.With(logger, "component", "AlertmanagerDistributor"), am.registry)
am.distributor, err = NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, am.ring, nil, log.With(logger, "component", "AlertmanagerDistributor"), am.registry)
if err != nil {
return nil, errors.Wrap(err, "create distributor")
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
"github.com/cortexproject/cortex/pkg/storegateway"
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/push"
)

Expand Down Expand Up @@ -169,21 +168,21 @@ func (a *API) RegisterAlertmanager(am *alertmanager.MultitenantAlertmanager, amC
a.RegisterRoute("/multitenant_alertmanager/ring", http.HandlerFunc(am.RingHandler), false, "GET", "POST")

// UI components lead to a large number of routes to support, utilize a path prefix instead
a.RegisterRoutesWithPrefix(a.cfg.AlertmanagerHTTPPrefix, util.NewMaxBytesHandler(am, amCfg.MaxRecvMsgSize), true)
a.RegisterRoutesWithPrefix(a.cfg.AlertmanagerHTTPPrefix, am, true)
level.Debug(a.logger).Log("msg", "api: registering alertmanager", "path_prefix", a.cfg.AlertmanagerHTTPPrefix)

// If the target is Alertmanager, enable the legacy behaviour. Otherwise only enable
// the component routed API.
if target {
a.RegisterRoute("/status", am.GetStatusHandler(), false, "GET")
a.RegisterRoutesWithPrefix(a.cfg.LegacyHTTPPrefix, util.NewMaxBytesHandler(am, amCfg.MaxRecvMsgSize), true)
a.RegisterRoutesWithPrefix(a.cfg.LegacyHTTPPrefix, am, true)
}

// MultiTenant Alertmanager Experimental API routes
if apiEnabled {
a.RegisterRoute("/api/v1/alerts", util.NewMaxBytesHandler(http.HandlerFunc(am.GetUserConfig), amCfg.MaxRecvMsgSize), true, "GET")
a.RegisterRoute("/api/v1/alerts", util.NewMaxBytesHandler(http.HandlerFunc(am.SetUserConfig), amCfg.MaxRecvMsgSize), true, "POST")
a.RegisterRoute("/api/v1/alerts", util.NewMaxBytesHandler(http.HandlerFunc(am.DeleteUserConfig), amCfg.MaxRecvMsgSize), true, "DELETE")
a.RegisterRoute("/api/v1/alerts", http.HandlerFunc(am.GetUserConfig), true, "GET")
a.RegisterRoute("/api/v1/alerts", http.HandlerFunc(am.SetUserConfig), true, "POST")
a.RegisterRoute("/api/v1/alerts", http.HandlerFunc(am.DeleteUserConfig), true, "DELETE")
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)

Expand Down Expand Up @@ -201,7 +202,7 @@ func writeError(w http.ResponseWriter, err error) {
case context.DeadlineExceeded:
err = errDeadlineExceeded
default:
if strings.Contains(err.Error(), "http: request body too large") {
if util.IsRequestBodyTooLarge(err) {
err = errRequestEntityTooLarge
}
}
Expand Down
32 changes: 3 additions & 29 deletions pkg/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,9 @@ import (

const messageSizeLargerErrFmt = "received message larger than max (%d vs %d)"

var ErrRequestBodyTooLarge = &errRequestBodyTooLarge{}

type errRequestBodyTooLarge struct{}

func (errRequestBodyTooLarge) Error() string { return "http: request body too large" }

func (errRequestBodyTooLarge) Is(err error) bool {
return err.Error() == "http: request body too large"
// IsRequestBodyTooLarge returns true if the error is "http: request body too large".
func IsRequestBodyTooLarge(err error) bool {
return err != nil && strings.Contains(err.Error(), "http: request body too large")
}

// BasicAuth configures basic authentication for HTTP clients.
Expand Down Expand Up @@ -245,24 +240,3 @@ func SerializeProtoResponse(w http.ResponseWriter, resp proto.Message, compressi
}
return nil
}

type MaxBytesHandler struct {
h http.Handler
maxBytes int64
}

// NewMaxBytesHandler returns a MaxBytesHandler.
// If maxBytes<0, then the max bytes is not used and the passed handler is returned back.
func NewMaxBytesHandler(h http.Handler, maxBytes int64) http.Handler {
if maxBytes < 0 {
return h
}
return &MaxBytesHandler{h: h, maxBytes: maxBytes}
}

func (h *MaxBytesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, h.maxBytes)
if h.h != nil {
h.h.ServeHTTP(w, r)
}
}
77 changes: 3 additions & 74 deletions pkg/util/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package util_test
import (
"bytes"
"context"
"errors"
"html/template"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -161,77 +160,7 @@ func (b bytesBuffered) BytesBuffer() *bytes.Buffer {
return b.Buffer
}

func TestMaxBytesHandler(t *testing.T) {
cases := []struct {
inpBody []byte
maxSize int64
expBody string
expStatusCode int
}{
{
inpBody: []byte{1, 2, 3, 4},
maxSize: 4,
expBody: "all is well",
expStatusCode: http.StatusOK,
}, {
inpBody: []byte{1, 2, 3, 4},
maxSize: 3,
expBody: util.ErrRequestBodyTooLarge.Error(),
expStatusCode: http.StatusBadRequest,
}, {
inpBody: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9},
maxSize: -1,
expBody: "all is well",
expStatusCode: http.StatusOK,
}, {
inpBody: []byte{1},
maxSize: 0,
expBody: util.ErrRequestBodyTooLarge.Error(),
expStatusCode: http.StatusBadRequest,
},
}
for _, c := range cases {
req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1/all-is-well", bytes.NewReader(c.inpBody))
assert.NoError(t, err)

h := util.NewMaxBytesHandler(&mockHandler{}, c.maxSize)
w := httptest.NewRecorder()
h.ServeHTTP(w, req)

assert.Equal(t, c.expStatusCode, w.Code)

b, err := ioutil.ReadAll(w.Body)
assert.NoError(t, err)
assert.Equal(t, c.expBody, string(b))
}
}

type mockHandler struct {
http.Handler
}

func (mockHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
_, err := ioutil.ReadAll(req.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error())) //nolint:errcheck
return
}
w.Write([]byte("all is well")) //nolint:errcheck
}

func TestErrRequestBodyTooLargeRegression(t *testing.T) {
var handlerErr error
handler := func(w http.ResponseWriter, req *http.Request) {
_, handlerErr = ioutil.ReadAll(req.Body)
w.WriteHeader(http.StatusOK)
}

h := util.NewMaxBytesHandler(http.HandlerFunc(handler), 1)

req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1/test", bytes.NewReader([]byte{1, 2, 3, 4}))
assert.NoError(t, err)
h.ServeHTTP(httptest.NewRecorder(), req)

assert.True(t, errors.Is(util.ErrRequestBodyTooLarge, handlerErr))
func TestIsRequestBodyTooLargeRegression(t *testing.T) {
_, err := ioutil.ReadAll(http.MaxBytesReader(httptest.NewRecorder(), ioutil.NopCloser(bytes.NewReader([]byte{1, 2, 3, 4})), 1))
assert.True(t, util.IsRequestBodyTooLarge(err))
}

0 comments on commit 6a39e95

Please sign in to comment.