Skip to content

Commit

Permalink
test(http): add unit tests for /api/v2/write
Browse files Browse the repository at this point in the history
We have been tracking down odd error messages when writing data and
found the problem to be internal server errors when writing empty
bodies.

I added fairly comprehensive test coverage for /api/v2/write as well
as simplify and clarify the error messages.
  • Loading branch information
goller committed Sep 12, 2019
1 parent dffca8f commit bb72fde
Show file tree
Hide file tree
Showing 7 changed files with 379 additions and 70 deletions.
30 changes: 18 additions & 12 deletions context/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"

platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb"
)

type contextKey string
Expand All @@ -14,17 +14,23 @@ const (
)

// SetAuthorizer sets an authorizer on context.
func SetAuthorizer(ctx context.Context, a platform.Authorizer) context.Context {
func SetAuthorizer(ctx context.Context, a influxdb.Authorizer) context.Context {
return context.WithValue(ctx, authorizerCtxKey, a)
}

// GetAuthorizer retrieves an authorizer from context.
func GetAuthorizer(ctx context.Context) (platform.Authorizer, error) {
a, ok := ctx.Value(authorizerCtxKey).(platform.Authorizer)
func GetAuthorizer(ctx context.Context) (influxdb.Authorizer, error) {
a, ok := ctx.Value(authorizerCtxKey).(influxdb.Authorizer)
if !ok {
return nil, &platform.Error{
return nil, &influxdb.Error{
Msg: "authorizer not found on context",
Code: platform.EInternal,
Code: influxdb.EInternal,
}
}
if a == nil {
return nil, &influxdb.Error{
Code: influxdb.EInternal,
Msg: "unexpected invalid authorizer",
}
}

Expand All @@ -33,19 +39,19 @@ func GetAuthorizer(ctx context.Context) (platform.Authorizer, error) {

// GetToken retrieves a token from the context; errors if no token.
func GetToken(ctx context.Context) (string, error) {
a, ok := ctx.Value(authorizerCtxKey).(platform.Authorizer)
a, ok := ctx.Value(authorizerCtxKey).(influxdb.Authorizer)
if !ok {
return "", &platform.Error{
return "", &influxdb.Error{
Msg: "authorizer not found on context",
Code: platform.EInternal,
Code: influxdb.EInternal,
}
}

auth, ok := a.(*platform.Authorization)
auth, ok := a.(*influxdb.Authorization)
if !ok {
return "", &platform.Error{
return "", &influxdb.Error{
Msg: fmt.Sprintf("authorizer not an authorization but a %T", a),
Code: platform.EInternal,
Code: influxdb.EInternal,
}
}

Expand Down
12 changes: 6 additions & 6 deletions http/authentication_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,16 @@ func (h *AuthenticationHandler) extractSession(ctx context.Context, r *http.Requ
return ctx, err
}

s, e := h.SessionService.FindSession(ctx, k)
if e != nil {
return ctx, e
s, err := h.SessionService.FindSession(ctx, k)
if err != nil {
return ctx, err
}

if !h.SessionRenewDisabled {
// if the session is not expired, renew the session
e = h.SessionService.RenewSession(ctx, s, time.Now().Add(platform.RenewSessionTime))
if e != nil {
return ctx, e
err = h.SessionService.RenewSession(ctx, s, time.Now().Add(platform.RenewSessionTime))
if err != nil {
return ctx, err
}
}

Expand Down
6 changes: 6 additions & 0 deletions http/metric/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,9 @@ type Event struct {
ResponseBytes int
Status int
}

// NopEventRecorder never records events.
type NopEventRecorder struct{}

// Record never records events.
func (n *NopEventRecorder) Record(ctx context.Context, e Event) {}
6 changes: 6 additions & 0 deletions http/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ const (
)

// queryOrganization returns the organization for any http request.
//
// It checks the org= and then orgID= parameter of the request.
//
// This will try to find the organization using an ID string or
// the name. It interprets the &org= parameter as either the name
// or the ID.
func queryOrganization(ctx context.Context, r *http.Request, svc platform.OrganizationService) (o *platform.Organization, err error) {

filter := platform.OrganizationFilter{}
Expand Down
6 changes: 3 additions & 3 deletions http/session_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type signoutRequest struct {
Key string
}

func decodeSignoutRequest(ctx context.Context, r *http.Request) (*signoutRequest, *platform.Error) {
func decodeSignoutRequest(ctx context.Context, r *http.Request) (*signoutRequest, error) {
key, err := decodeCookieSession(ctx, r)
if err != nil {
return nil, err
Expand All @@ -145,12 +145,12 @@ func encodeCookieSession(w http.ResponseWriter, s *platform.Session) {

http.SetCookie(w, c)
}
func decodeCookieSession(ctx context.Context, r *http.Request) (string, *platform.Error) {
func decodeCookieSession(ctx context.Context, r *http.Request) (string, error) {
c, err := r.Cookie(cookieSessionName)
if err != nil {
return "", &platform.Error{
Err: err,
Code: platform.EInvalid,
Err: err,
}
}
return c.Value, nil
Expand Down
90 changes: 47 additions & 43 deletions http/write_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/julienschmidt/httprouter"
"go.uber.org/zap"

platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb"
pcontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/models"
Expand All @@ -24,13 +24,13 @@ import (
// WriteBackend is all services and associated parameters required to construct
// the WriteHandler.
type WriteBackend struct {
platform.HTTPErrorHandler
influxdb.HTTPErrorHandler
Logger *zap.Logger
WriteEventRecorder metric.EventRecorder

PointsWriter storage.PointsWriter
BucketService platform.BucketService
OrganizationService platform.OrganizationService
BucketService influxdb.BucketService
OrganizationService influxdb.OrganizationService
}

// NewWriteBackend returns a new instance of WriteBackend.
Expand All @@ -49,11 +49,11 @@ func NewWriteBackend(b *APIBackend) *WriteBackend {
// WriteHandler receives line protocol and sends to a publish function.
type WriteHandler struct {
*httprouter.Router
platform.HTTPErrorHandler
influxdb.HTTPErrorHandler
Logger *zap.Logger

BucketService platform.BucketService
OrganizationService platform.OrganizationService
BucketService influxdb.BucketService
OrganizationService influxdb.OrganizationService

PointsWriter storage.PointsWriter

Expand Down Expand Up @@ -92,7 +92,7 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {

// TODO(desa): I really don't like how we're recording the usage metrics here
// Ideally this will be moved when we solve https://github.com/influxdata/influxdb/issues/13403
var orgID platform.ID
var orgID influxdb.ID
var requestBytes int
sw := newStatusResponseWriter(w)
w = sw
Expand All @@ -111,8 +111,8 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
var err error
in, err = gzip.NewReader(r.Body)
if err != nil {
h.HandleHTTPError(ctx, &platform.Error{
Code: platform.EInvalid,
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInvalid,
Op: "http/handleWrite",
Msg: errInvalidGzipHeader,
Err: err,
Expand All @@ -136,7 +136,7 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {

logger := h.Logger.With(zap.String("org", req.Org), zap.String("bucket", req.Bucket))

var org *platform.Organization
var org *influxdb.Organization
org, err = queryOrganization(ctx, r, h.OrganizationService)
if err != nil {
logger.Info("Failed to find organization", zap.Error(err))
Expand All @@ -146,31 +146,28 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {

orgID = org.ID

var bucket *platform.Bucket
if id, err := platform.IDFromString(req.Bucket); err == nil {
var bucket *influxdb.Bucket
if id, err := influxdb.IDFromString(req.Bucket); err == nil {
// Decoded ID successfully. Make sure it's a real bucket.
b, err := h.BucketService.FindBucket(ctx, platform.BucketFilter{
b, err := h.BucketService.FindBucket(ctx, influxdb.BucketFilter{
OrganizationID: &org.ID,
ID: id,
})
if err == nil {
bucket = b
} else if platform.ErrorCode(err) != platform.ENotFound {
} else if influxdb.ErrorCode(err) != influxdb.ENotFound {
h.HandleHTTPError(ctx, err, w)
return
}
}

if bucket == nil {
b, err := h.BucketService.FindBucket(ctx, platform.BucketFilter{
b, err := h.BucketService.FindBucket(ctx, influxdb.BucketFilter{
OrganizationID: &org.ID,
Name: &req.Bucket,
})
if err != nil {
h.HandleHTTPError(ctx, &platform.Error{
Op: "http/handleWrite",
Err: err,
}, w)
h.HandleHTTPError(ctx, err, w)
return
}

Expand All @@ -179,18 +176,18 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {

// TODO(jade): remove this after system buckets issue is resolved
if bucket.IsSystem() {
h.HandleHTTPError(ctx, &platform.Error{
Code: platform.EForbidden,
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EForbidden,
Op: "http/handleWrite",
Msg: fmt.Sprintf("cannot write to internal bucket %s", bucket.Name),
}, w)
return
}

p, err := platform.NewPermissionAtID(bucket.ID, platform.WriteAction, platform.BucketsResourceType, org.ID)
p, err := influxdb.NewPermissionAtID(bucket.ID, influxdb.WriteAction, influxdb.BucketsResourceType, org.ID)
if err != nil {
h.HandleHTTPError(ctx, &platform.Error{
Code: platform.EInternal,
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInternal,
Op: "http/handleWrite",
Msg: fmt.Sprintf("unable to create permission for bucket: %v", err),
Err: err,
Expand All @@ -199,8 +196,8 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
}

if !a.Allowed(*p) {
h.HandleHTTPError(ctx, &platform.Error{
Code: platform.EForbidden,
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EForbidden,
Op: "http/handleWrite",
Msg: "insufficient permissions for write",
}, w)
Expand All @@ -213,36 +210,43 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
data, err := ioutil.ReadAll(in)
if err != nil {
logger.Error("Error reading body", zap.Error(err))
h.HandleHTTPError(ctx, &platform.Error{
Code: platform.EInternal,
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInternal,
Op: "http/handleWrite",
Msg: fmt.Sprintf("unable to read data: %v", err),
Err: err,
}, w)
return
}

requestBytes = len(data)
if requestBytes == 0 {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInvalid,
Op: "http/handleWrite",
Msg: "writing requires points",
}, w)
return
}

encoded := tsdb.EncodeName(org.ID, bucket.ID)
mm := models.EscapeMeasurement(encoded[:])
points, err := models.ParsePointsWithPrecision(data, mm, time.Now(), req.Precision)
if err != nil {
logger.Error("Error parsing points", zap.Error(err))
h.HandleHTTPError(ctx, &platform.Error{
Code: platform.EInvalid,
Op: "http/handleWrite",
Msg: fmt.Sprintf("unable to parse points: %v", err),
Err: err,
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: err.Error(),
}, w)
return
}

if err := h.PointsWriter.WritePoints(ctx, points); err != nil {
logger.Error("Error writing points", zap.Error(err))
h.HandleHTTPError(ctx, &platform.Error{
Code: platform.EInternal,
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInternal,
Op: "http/handleWrite",
Msg: fmt.Sprintf("unable to write points to database: %v", err),
Msg: "unexpected error writing points to database",
Err: err,
}, w)
return
Expand All @@ -259,8 +263,8 @@ func decodeWriteRequest(ctx context.Context, r *http.Request) (*postWriteRequest
}

if !models.ValidPrecision(p) {
return nil, &platform.Error{
Code: platform.EInvalid,
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Op: "http/decodeWriteRequest",
Msg: errInvalidPrecision,
}
Expand All @@ -287,17 +291,17 @@ type WriteService struct {
InsecureSkipVerify bool
}

var _ platform.WriteService = (*WriteService)(nil)
var _ influxdb.WriteService = (*WriteService)(nil)

func (s *WriteService) Write(ctx context.Context, orgID, bucketID platform.ID, r io.Reader) error {
func (s *WriteService) Write(ctx context.Context, orgID, bucketID influxdb.ID, r io.Reader) error {
precision := s.Precision
if precision == "" {
precision = "ns"
}

if !models.ValidPrecision(precision) {
return &platform.Error{
Code: platform.EInvalid,
return &influxdb.Error{
Code: influxdb.EInvalid,
Op: "http/Write",
Msg: errInvalidPrecision,
}
Expand Down
Loading

0 comments on commit bb72fde

Please sign in to comment.