Skip to content

Commit

Permalink
Fix remaining comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
  • Loading branch information
codesome committed Feb 8, 2021
1 parent 7ae2d0b commit f414266
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 20 deletions.
6 changes: 5 additions & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ ha_tracker:
[mirror_timeout: <duration> | default = 2s]
# remote_write API max receive message size (bytes).
# CLI flag: -distributor.max-recv-msg-size
# CLI flag: -distributor.max-recv-body-size
[max_recv_msg_size: <int> | default = 104857600]
# Timeout for downstream ingesters.
Expand Down Expand Up @@ -1542,6 +1542,10 @@ The `alertmanager_config` configures the Cortex alertmanager.
# CLI flag: -alertmanager.configs.poll-interval
[poll_interval: <duration> | default = 15s]
# Max receive http body size (bytes).
# CLI flag: -alertmanager.max-recv-msg-size
[max_recv_msg_size: <int> | default = 16777216]
# Deprecated. Use -alertmanager.cluster.listen-address instead.
# CLI flag: -cluster.listen-address
[cluster_bind_address: <string> | default = "0.0.0.0:9094"]
Expand Down
21 changes: 15 additions & 6 deletions pkg/alertmanager/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func (d *Distributor) doWrite(userID string, w http.ResponseWriter, r *http.Requ
if r.Body != nil {
body, err = ioutil.ReadAll(r.Body)
if err != nil {
if err.Error() == "http: request body too large\n" {
http.Error(w, "Request body too large", http.StatusBadRequest)
}
level.Error(logger).Log("msg", "failed to read the request body during write", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
Expand All @@ -110,9 +113,8 @@ func (d *Distributor) doWrite(userID string, w http.ResponseWriter, r *http.Requ
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.AlertmanagerClient.RemoteTimeout)
defer cancel()
localCtx = user.InjectOrgID(localCtx, userID)
if sp := opentracing.SpanFromContext(r.Context()); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
sp, localCtx := opentracing.StartSpanFromContext(localCtx, "DistributeRequest.doWrite")
defer sp.Finish()

resp, err := d.doRequest(localCtx, am, &httpgrpc.HTTPRequest{
Method: r.Method,
Expand Down Expand Up @@ -141,8 +143,12 @@ func (d *Distributor) doWrite(userID string, w http.ResponseWriter, r *http.Requ
respondFromError(err, w, logger)
}

if firstSuccessfulResponse != nil {
respondFromHTTPGRPCResponse(w, firstSuccessfulResponse)
respMtx.Lock() // Another request might be ongoing after quorum.
resp := firstSuccessfulResponse
respMtx.Unlock()

if resp != nil {
respondFromHTTPGRPCResponse(w, resp)
} else {
// This should not happen.
level.Error(logger).Log("msg", "distributor did not receive response from alertmanager though no errors")
Expand All @@ -168,6 +174,9 @@ func (d *Distributor) doRead(userID string, w http.ResponseWriter, r *http.Reque

ctx, cancel := context.WithTimeout(r.Context(), d.cfg.AlertmanagerClient.RemoteTimeout)
defer cancel()

sp, ctx := opentracing.StartSpanFromContext(ctx, "DistributeRequest.doRead")
defer sp.Finish()
// Until we have a mechanism to combine the results from multiple alertmanagers,
// we forward the request to only only of the alertmanagers.
amDesc := replicationSet.Ingesters[rand.Intn(len(replicationSet.Ingesters))]
Expand Down Expand Up @@ -197,7 +206,7 @@ func respondFromHTTPGRPCResponse(w http.ResponseWriter, httpResp *httpgrpc.HTTPR
}
}
w.WriteHeader(int(httpResp.Code))
w.Write(httpResp.Body)
w.Write(httpResp.Body) //nolint
}

func (d *Distributor) doRequest(ctx context.Context, am ring.InstanceDesc, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
Expand Down
12 changes: 7 additions & 5 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"flag"
"fmt"
"github.com/weaveworks/common/httpgrpc"
"hash/fnv"
"html/template"
"io/ioutil"
Expand All @@ -22,6 +21,7 @@ import (
amconfig "github.com/prometheus/alertmanager/config"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"

"github.com/cortexproject/cortex/pkg/alertmanager/alerts"
Expand Down Expand Up @@ -92,10 +92,11 @@ func init() {

// MultitenantAlertmanagerConfig is the configuration for a multitenant Alertmanager.
type MultitenantAlertmanagerConfig struct {
DataDir string `yaml:"data_dir"`
Retention time.Duration `yaml:"retention"`
ExternalURL flagext.URLValue `yaml:"external_url"`
PollInterval time.Duration `yaml:"poll_interval"`
DataDir string `yaml:"data_dir"`
Retention time.Duration `yaml:"retention"`
ExternalURL flagext.URLValue `yaml:"external_url"`
PollInterval time.Duration `yaml:"poll_interval"`
MaxRecvMsgSize int64 `yaml:"max_recv_msg_size"`

DeprecatedClusterBindAddr string `yaml:"cluster_bind_address"`
DeprecatedClusterAdvertiseAddr string `yaml:"cluster_advertise_address"`
Expand Down Expand Up @@ -136,6 +137,7 @@ const (
func (cfg *MultitenantAlertmanagerConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.DataDir, "alertmanager.storage.path", "data/", "Base path for data storage.")
f.DurationVar(&cfg.Retention, "alertmanager.storage.retention", 5*24*time.Hour, "How long to keep data for.")
f.Int64Var(&cfg.MaxRecvMsgSize, "alertmanager.max-recv-msg-size", 16<<20, "Max receive http body size (bytes).")

f.Var(&cfg.ExternalURL, "alertmanager.web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.")

Expand Down
13 changes: 7 additions & 6 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
"github.com/cortexproject/cortex/pkg/storegateway"
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/push"
)

Expand Down Expand Up @@ -158,7 +159,7 @@ func (a *API) RegisterRoutesWithPrefix(prefix string, handler http.Handler, auth

// RegisterAlertmanager registers endpoints associated with the alertmanager. It will only
// serve endpoints using the legacy http-prefix if it is not run as a single binary.
func (a *API) RegisterAlertmanager(am *alertmanager.MultitenantAlertmanager, target, apiEnabled bool) {
func (a *API) RegisterAlertmanager(am *alertmanager.MultitenantAlertmanager, amCfg alertmanager.MultitenantAlertmanagerConfig, target, apiEnabled bool) {
alertmanagerpb.RegisterAlertmanagerServer(a.server.GRPC, am)

a.indexPage.AddLink(SectionAdminEndpoints, "/multitenant_alertmanager/status", "Alertmanager Status")
Expand All @@ -168,21 +169,21 @@ func (a *API) RegisterAlertmanager(am *alertmanager.MultitenantAlertmanager, tar
a.RegisterRoute("/multitenant_alertmanager/ring", http.HandlerFunc(am.RingHandler), false, "GET", "POST")

// UI components lead to a large number of routes to support, utilize a path prefix instead
a.RegisterRoutesWithPrefix(a.cfg.AlertmanagerHTTPPrefix, am, true)
a.RegisterRoutesWithPrefix(a.cfg.AlertmanagerHTTPPrefix, util.NewMaxBytesHandler(am, amCfg.MaxRecvMsgSize), true)
level.Debug(a.logger).Log("msg", "api: registering alertmanager", "path_prefix", a.cfg.AlertmanagerHTTPPrefix)

// If the target is Alertmanager, enable the legacy behaviour. Otherwise only enable
// the component routed API.
if target {
a.RegisterRoute("/status", am.GetStatusHandler(), false, "GET")
a.RegisterRoutesWithPrefix(a.cfg.LegacyHTTPPrefix, am, true)
a.RegisterRoutesWithPrefix(a.cfg.LegacyHTTPPrefix, util.NewMaxBytesHandler(am, amCfg.MaxRecvMsgSize), true)
}

// MultiTenant Alertmanager Experimental API routes
if apiEnabled {
a.RegisterRoute("/api/v1/alerts", http.HandlerFunc(am.GetUserConfig), true, "GET")
a.RegisterRoute("/api/v1/alerts", http.HandlerFunc(am.SetUserConfig), true, "POST")
a.RegisterRoute("/api/v1/alerts", http.HandlerFunc(am.DeleteUserConfig), true, "DELETE")
a.RegisterRoute("/api/v1/alerts", util.NewMaxBytesHandler(http.HandlerFunc(am.GetUserConfig), amCfg.MaxRecvMsgSize), true, "GET")
a.RegisterRoute("/api/v1/alerts", util.NewMaxBytesHandler(http.HandlerFunc(am.SetUserConfig), amCfg.MaxRecvMsgSize), true, "POST")
a.RegisterRoute("/api/v1/alerts", util.NewMaxBytesHandler(http.HandlerFunc(am.DeleteUserConfig), amCfg.MaxRecvMsgSize), true, "DELETE")
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) {
return
}

t.API.RegisterAlertmanager(t.Alertmanager, t.Cfg.isModuleEnabled(AlertManager), t.Cfg.Alertmanager.EnableAPI)
t.API.RegisterAlertmanager(t.Alertmanager, t.Cfg.Alertmanager, t.Cfg.isModuleEnabled(AlertManager), t.Cfg.Alertmanager.EnableAPI)
return t.Alertmanager, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.HATrackerConfig.RegisterFlags(f)
cfg.DistributorRing.RegisterFlags(f)

f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "remote_write API max receive message size (bytes).")
f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-body-size", 100<<20, "remote_write API max receive message size (bytes).")
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
Expand Down
21 changes: 21 additions & 0 deletions pkg/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,24 @@ func SerializeProtoResponse(w http.ResponseWriter, resp proto.Message, compressi
}
return nil
}

type MaxBytesHandler struct {
h http.Handler
maxBytes int64
}

// NewMaxBytesHandler returns a MaxBytesHandler.
// If maxBytes<0, then the max bytes is not used and the passed handler is returned back.
func NewMaxBytesHandler(h http.Handler, maxBytes int64) http.Handler {
if maxBytes < 0 {
return h
}
return &MaxBytesHandler{h: h, maxBytes: maxBytes}
}

func (h *MaxBytesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, h.maxBytes)
if h.h != nil {
h.h.ServeHTTP(w, r)
}
}
60 changes: 60 additions & 0 deletions pkg/util/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bytes"
"context"
"html/template"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"

Expand Down Expand Up @@ -157,3 +159,61 @@ func (b bytesBuffered) Close() error {
func (b bytesBuffered) BytesBuffer() *bytes.Buffer {
return b.Buffer
}

func TestMaxBytesHandler(t *testing.T) {
cases := []struct {
inpBody []byte
maxSize int64
expBody string
expStatusCode int
}{
{
inpBody: []byte{1, 2, 3, 4},
maxSize: 4,
expBody: "all is well",
expStatusCode: http.StatusOK,
}, {
inpBody: []byte{1, 2, 3, 4},
maxSize: 3,
expBody: "http: request body too large\n",
expStatusCode: http.StatusBadRequest,
}, {
inpBody: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9},
maxSize: -1,
expBody: "all is well",
expStatusCode: http.StatusOK,
}, {
inpBody: []byte{1},
maxSize: 0,
expBody: "http: request body too large\n",
expStatusCode: http.StatusBadRequest,
},
}
for _, c := range cases {
req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1/all-is-well", bytes.NewReader(c.inpBody))
assert.NoError(t, err)

h := util.NewMaxBytesHandler(&mockHandler{}, c.maxSize)
w := httptest.NewRecorder()
h.ServeHTTP(w, req)

assert.Equal(t, c.expStatusCode, w.Code)

b, err := ioutil.ReadAll(w.Body)
assert.NoError(t, err)
assert.Equal(t, c.expBody, string(b))
}
}

type mockHandler struct {
http.Handler
}

func (mockHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
_, err := ioutil.ReadAll(req.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.Write([]byte("all is well")) //nolint
}

0 comments on commit f414266

Please sign in to comment.