Skip to content

Commit

Permalink
feat(handler): Add 2.0 compatible write endpoint
Browse files Browse the repository at this point in the history
This commit adds a /v2/write endpoint that maps the supplied bucket and
org to a v1 database and retention policy.

fixes #16898
  • Loading branch information
ayang64 committed Feb 18, 2020
1 parent 6b07ed7 commit fc89d9c
Showing 1 changed file with 21 additions and 5 deletions.
26 changes: 21 additions & 5 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,11 @@ func NewHandler(c Config) *Handler {
},
Route{
"write", // Data-ingest route.
"POST", "/write", true, writeLogEnabled, h.serveWrite,
"POST", "/write", true, writeLogEnabled, h.serveWriteV1,
},
Route{
"write", // Data-ingest route.
"POST", "/v2/write", true, writeLogEnabled, h.serveWriteV2,
},
Route{
"prometheus-write", // Prometheus remote write
Expand Down Expand Up @@ -772,8 +776,21 @@ func (h *Handler) async(q *influxql.Query, results <-chan *query.Result) {
}
}

// serveWrite receives incoming series data in line protocol format and writes it to the database.
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.User) {
// serveWriteV2 maps v2 write parameters to a v1 style handler. the concepts
// of an "org" and "bucket" are mapped to v1 "database" and "retention
// policies".
func (h *Handler) serveWriteV2(w http.ResponseWriter, r *http.Request, user meta.User) {
h.serveWrite(r.URL.Query().Get("org"), r.URL.Query().Get("bucket"), w, r, user)
}

// serveWriteV1 handles v1 style writes.
func (h *Handler) serveWriteV1(w http.ResponseWriter, r *http.Request, user meta.User) {
h.serveWrite(r.URL.Query().Get("db"), r.URL.Query().Get("rp"), w, r, user)
}

// serveWrite receives incoming series data in line protocol format and writes
// it to the database.
func (h *Handler) serveWrite(database string, retentionPolicy string, w http.ResponseWriter, r *http.Request, user meta.User) {
atomic.AddInt64(&h.stats.WriteRequests, 1)
atomic.AddInt64(&h.stats.ActiveWriteRequests, 1)
defer func(start time.Time) {
Expand All @@ -782,7 +799,6 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U
}(time.Now())
h.requestTracker.Add(r, user)

database := r.URL.Query().Get("db")
if database == "" {
h.httpError(w, "database is required", http.StatusBadRequest)
return
Expand Down Expand Up @@ -877,7 +893,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U
}

// Write points.
if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err) {
if err := h.PointsWriter.WritePoints(database, retentionPolicy, consistency, user, points); influxdb.IsClientError(err) {
atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
h.httpError(w, err.Error(), http.StatusBadRequest)
return
Expand Down

0 comments on commit fc89d9c

Please sign in to comment.