Skip to content

Commit

Permalink
fix: Remove bucket and mapping auto-creation from /write API
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartcarnie committed Nov 10, 2020
1 parent b643482 commit acd30c8
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 318 deletions.
98 changes: 6 additions & 92 deletions http/legacy/write_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ package legacy

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"time"

"github.com/influxdata/httprouter"
"github.com/influxdata/influxdb/v2"
Expand All @@ -21,11 +18,7 @@ import (
var _ http.Handler = (*WriteHandler)(nil)

const (
opPointsWriter = "http/v1PointsWriter"
opWriteHandler = "http/v1WriteHandler"

autoCreatedBucketDescription = "Auto-created from v1 db/rp mapping."
autoCreatedBucketRetentionPeriod = 3 * 24 * time.Hour
)

// PointsWriterBackend contains all the services needed to run a PointsWriterHandler.
Expand Down Expand Up @@ -62,7 +55,6 @@ type WriteHandler struct {
router *httprouter.Router
logger *zap.Logger
maxBatchSizeBytes int64
//parserOptions []models.ParserOption
}

// NewWriterHandler returns a new instance of PointsWriterHandler.
Expand Down Expand Up @@ -98,13 +90,6 @@ func WithMaxBatchSizeBytes(n int64) WriteHandlerOption {
}
}

//// WithParserOptions configures options for points parsing
//func WithParserOptions(opts ...models.ParserOption) WriteHandlerOption {
// return func(w *WriteHandler) {
// w.parserOptions = opts
// }
//}

// ServeHTTP implements http.Handler
func (h *WriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.router.ServeHTTP(w, r)
Expand Down Expand Up @@ -136,7 +121,7 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
return
}

bucket, err := h.findOrCreateMappedBucket(ctx, auth.OrgID, req.Database, req.RetentionPolicy)
bucket, err := h.findBucket(ctx, auth.OrgID, req.Database, req.RetentionPolicy)
if err != nil {
h.HandleHTTPError(ctx, err, sw)
return
Expand All @@ -162,25 +147,15 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

// findOrCreateMappedBucket finds a DBRPMappingV2 for the database and
// retention policy combination. If the mapping doesn't exist, it will be
// created and bound to either an existing Bucket or a new one created for this
// purpose.
func (h *WriteHandler) findOrCreateMappedBucket(ctx context.Context, orgID influxdb.ID, db, rp string) (*influxdb.Bucket, error) {
// findBucket finds a bucket for the specified database and
// retention policy combination.
func (h *WriteHandler) findBucket(ctx context.Context, orgID influxdb.ID, db, rp string) (*influxdb.Bucket, error) {
mapping, err := h.findMapping(ctx, orgID, db, rp)
if err == nil {
return h.BucketService.FindBucketByID(ctx, mapping.BucketID)
}

if !isErrNotFound(err) {
return nil, err
}

bucket, err := h.mapToBucket(ctx, orgID, db, rp)
if err != nil {
return nil, err
}
return bucket, nil
return nil, err
}

// findMapping finds a DBRPMappingV2 for the database and retention policy
Expand All @@ -207,62 +182,6 @@ func (h *WriteHandler) findMapping(ctx context.Context, orgID influxdb.ID, db, r
return mappings[0], nil
}

// createMapping creates a DBRPMappingV2 for the database and retention policy
// combination.
func (h *WriteHandler) createMapping(ctx context.Context, orgID, bucketID influxdb.ID, db, rp string) error {
return h.DBRPMappingService.Create(ctx, &influxdb.DBRPMappingV2{
OrganizationID: orgID,
BucketID: bucketID,
Database: db,
RetentionPolicy: rp,
})
}

// mapToBucket creates a new DBRPMappingV2 to either an existing Bucket (if it
// can find it) or a new one it creates for this purpose.
func (h *WriteHandler) mapToBucket(ctx context.Context, orgID influxdb.ID, db, rp string) (*influxdb.Bucket, error) {
if rp == "" {
rp = "autogen"
}

name := fmt.Sprintf("%s/%s", db, rp)
bucket, err := h.BucketService.FindBucket(ctx, influxdb.BucketFilter{
OrganizationID: &orgID,
Name: &name,
})
if err == nil {
if err := h.createMapping(ctx, orgID, bucket.ID, db, rp); err != nil {
return nil, err
}
return bucket, nil
}
if !isErrNotFound(err) {
return nil, err
}

now := time.Now().UTC()
bucket = &influxdb.Bucket{
Type: influxdb.BucketTypeUser,
Name: name,
Description: autoCreatedBucketDescription,
OrgID: orgID,
RetentionPolicyName: rp,
RetentionPeriod: autoCreatedBucketRetentionPeriod,
CRUDLog: influxdb.CRUDLog{
CreatedAt: now,
UpdatedAt: now,
},
}
err = h.BucketService.CreateBucket(ctx, bucket)
if err != nil {
return nil, err
}
if err := h.createMapping(ctx, orgID, bucket.ID, db, rp); err != nil {
return nil, err
}
return bucket, nil
}

// writeRequest is a transport-agnostic write request. It holds all inputs for
// processing a v1 write request.
type writeRequest struct {
Expand All @@ -275,7 +194,7 @@ type writeRequest struct {

// decodeWriteRequest extracts write request information from an inbound
// http.Request and returns a writeRequest.
func decodeWriteRequest(ctx context.Context, r *http.Request, maxBatchSizeBytes int64) (*writeRequest, error) {
func decodeWriteRequest(_ context.Context, r *http.Request, maxBatchSizeBytes int64) (*writeRequest, error) {
qp := r.URL.Query()
precision := qp.Get("precision")
if precision == "" {
Expand Down Expand Up @@ -303,8 +222,3 @@ func decodeWriteRequest(ctx context.Context, r *http.Request, maxBatchSizeBytes
Body: body,
}, nil
}

func isErrNotFound(err error) bool {
var idErr *influxdb.Error
return errors.As(err, &idErr) && idErr.Code == influxdb.ENotFound
}
Loading

0 comments on commit acd30c8

Please sign in to comment.