Skip to content

Commit

Permalink
feat: Collect values written stats (#19187)
Browse files Browse the repository at this point in the history
* feat(engine/tsm1): Add WritePointsWithContext()

Add WritePontsWithContext() and make WritePoints() a thin wrapper for
it.

The purpose is to add statistics context values that we'll use to
propagate the number of fields and points written to calls up the call
chain.

* feat(tsdb): Add WriteToShardWithContext()

When applied, this patch adds WriteToShardWithContext() and wraps it
with WriteToShard() to preserve the API.

The the purpose of this addition is to propagate a context.Context value
to Shard.WritePointsWithContext().

* feat(tsdb/shard): Add WritePointsWithContext()

The purpose of adding WritePointsWithContext() is to propage context
values down to engine code and propage statistics via the context.Value
up to callers.

This patch also adds values written statistics to the shard.

* feat(http): Gather values written stats

WritePointsWithContext() was added to propagate context values down to
the engine and communicate stats to the caller.

* feat(http): Gather values written stats

WritePointsWithContext() was added to propagate context values down to
the engine and communicate stats to the caller.

* refactor: Change MetricKey to ContextKey

This patch gives the type we're useing for context keys a better name.
  • Loading branch information
ayang64 authored Aug 12, 2020
1 parent 8eade84 commit 6ce0e11
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 34 deletions.
105 changes: 82 additions & 23 deletions coordinator/points_writer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package coordinator

import (
"context"
"errors"
"fmt"
"sort"
Expand Down Expand Up @@ -282,13 +283,37 @@ func (w *PointsWriter) WritePointsInto(p *IntoWriteRequest) error {
return w.WritePointsPrivileged(p.Database, p.RetentionPolicy, models.ConsistencyLevelOne, p.Points)
}

// WritePoints writes the data to the underlying storage. consitencyLevel and user are only used for clustered scenarios
// A wrapper for WritePointsWithContext()
func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error {
return w.WritePointsPrivileged(database, retentionPolicy, consistencyLevel, points)
return w.WritePointsWithContext(context.Background(), database, retentionPolicy, consistencyLevel, user, points)

}

type ContextKey int

const (
StatPointsWritten = ContextKey(iota)
StatValuesWritten
)

// WritePointsWithContext writes data to the underlying storage. consitencyLevel and user are only used for clustered scenarios.
//
func (w *PointsWriter) WritePointsWithContext(ctx context.Context, database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error {
return w.WritePointsPrivilegedWithContext(ctx, database, retentionPolicy, consistencyLevel, points)
}

// WritePointsPrivileged writes the data to the underlying storage, consitencyLevel is only used for clustered scenarios
func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
return w.WritePointsPrivilegedWithContext(context.Background(), database, retentionPolicy, consistencyLevel, points)
}

// WritePointsPrivilegedWithContext writes the data to the underlying storage,
// consitencyLevel is only used for clustered scenarios
//
// If a request for StatPointsWritten or StatValuesWritten of type ContextKey is
// sent via context values, this stores the total points and fields written in
// the memory pointed to by the associated wth the int64 pointers.
//
func (w *PointsWriter) WritePointsPrivilegedWithContext(ctx context.Context, database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
atomic.AddInt64(&w.stats.WriteReq, 1)
atomic.AddInt64(&w.stats.PointWriteReq, int64(len(points)))

Expand All @@ -308,13 +333,26 @@ func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, c
// Write each shard in it's own goroutine and return as soon as one fails.
ch := make(chan error, len(shardMappings.Points))
for shardID, points := range shardMappings.Points {
go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {
err := w.writeToShard(shard, database, retentionPolicy, points)
go func(ctx context.Context, shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {
var numPoints, numValues int64
ctx = context.WithValue(ctx, tsdb.StatPointsWritten, &numPoints)
ctx = context.WithValue(ctx, tsdb.StatValuesWritten, &numValues)

err := w.writeToShardWithContext(ctx, shard, database, retentionPolicy, points)
if err == tsdb.ErrShardDeletion {
err = tsdb.PartialWriteError{Reason: fmt.Sprintf("shard %d is pending deletion", shard.ID), Dropped: len(points)}
}

if v, ok := ctx.Value(StatPointsWritten).(*int64); ok {
atomic.AddInt64(v, numPoints)
}

if v, ok := ctx.Value(StatValuesWritten).(*int64); ok {
atomic.AddInt64(v, numValues)
}

ch <- err
}(shardMappings.Shards[shardID], database, retentionPolicy, points)
}(ctx, shardMappings.Shards[shardID], database, retentionPolicy, points)
}

// Send points to subscriptions if possible.
Expand Down Expand Up @@ -365,30 +403,51 @@ func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, c

// writeToShards writes points to a shard.
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) error {
return w.writeToShardWithContext(context.Background(), shard, database, retentionPolicy, points)
}

func (w *PointsWriter) writeToShardWithContext(ctx context.Context, shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) error {
atomic.AddInt64(&w.stats.PointWriteReqLocal, int64(len(points)))

err := w.TSDBStore.WriteToShard(shard.ID, points)
if err == nil {
atomic.AddInt64(&w.stats.WriteOK, 1)
// This is a small wrapper to make type-switching over w.TSDBStore a little
// less verbose.
writeToShard := func() error {
type shardWriterWithContext interface {
WriteToShardWithContext(context.Context, uint64, []models.Point) error
}
switch sw := w.TSDBStore.(type) {
case shardWriterWithContext:
if err := sw.WriteToShardWithContext(ctx, shard.ID, points); err != nil {
return err
}
default:
if err := w.TSDBStore.WriteToShard(shard.ID, points); err != nil {
return err
}
}
return nil
}

// Except tsdb.ErrShardNotFound no error can be handled here
if err != tsdb.ErrShardNotFound {
atomic.AddInt64(&w.stats.WriteErr, 1)
return err
}

// If we've written to shard that should exist on the current node, but the store has
// not actually created this shard, tell it to create it and retry the write
if err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID, true); err != nil {
w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err))
atomic.AddInt64(&w.stats.WriteErr, 1)
return err
}
if err := writeToShard(); err == tsdb.ErrShardNotFound {
// Shard doesn't exist -- lets create it and try again..

// If we've written to shard that should exist on the current node, but the
// store has not actually created this shard, tell it to create it and
// retry the write
if err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID, true); err != nil {
w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err))
atomic.AddInt64(&w.stats.WriteErr, 1)
return err
}

if err = w.TSDBStore.WriteToShard(shard.ID, points); err != nil {
w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err))
// Now that we've created the shard, try to write to it again.
if err := writeToShard(); err != nil {
w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err))
atomic.AddInt64(&w.stats.WriteErr, 1)
return err
}
} else {
atomic.AddInt64(&w.stats.WriteErr, 1)
return err
}
Expand Down
28 changes: 27 additions & 1 deletion services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/lang"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/coordinator"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
Expand Down Expand Up @@ -352,6 +353,7 @@ type Statistics struct {
WriteRequestBytesReceived int64
QueryRequestBytesTransmitted int64
PointsWrittenOK int64
ValuesWrittenOK int64
PointsWrittenDropped int64
PointsWrittenFail int64
AuthenticationFailures int64
Expand Down Expand Up @@ -383,6 +385,7 @@ func (h *Handler) Statistics(tags map[string]string) []models.Statistic {
statWriteRequestBytesReceived: atomic.LoadInt64(&h.stats.WriteRequestBytesReceived),
statQueryRequestBytesTransmitted: atomic.LoadInt64(&h.stats.QueryRequestBytesTransmitted),
statPointsWrittenOK: atomic.LoadInt64(&h.stats.PointsWrittenOK),
statValuesWrittenOK: atomic.LoadInt64(&h.stats.ValuesWrittenOK),
statPointsWrittenDropped: atomic.LoadInt64(&h.stats.PointsWrittenDropped),
statPointsWrittenFail: atomic.LoadInt64(&h.stats.PointsWrittenFail),
statAuthFail: atomic.LoadInt64(&h.stats.AuthenticationFailures),
Expand Down Expand Up @@ -968,8 +971,31 @@ func (h *Handler) serveWrite(database, retentionPolicy, precision string, w http
}
}

type pointsWriterWithContext interface {
WritePointsWithContext(context.Context, string, string, models.ConsistencyLevel, meta.User, []models.Point) error
}

writePoints := func() error {
switch pw := h.PointsWriter.(type) {
case pointsWriterWithContext:
var npoints, nvalues int64
ctx := context.WithValue(context.Background(), coordinator.StatPointsWritten, &npoints)
ctx = context.WithValue(ctx, coordinator.StatValuesWritten, &nvalues)

// for now, just store the number of values used.
err := pw.WritePointsWithContext(ctx, database, retentionPolicy, consistency, user, points)
atomic.AddInt64(&h.stats.ValuesWrittenOK, nvalues)
if err != nil {
return err
}
return nil
default:
return h.PointsWriter.WritePoints(database, retentionPolicy, consistency, user, points)
}
}

// Write points.
if err := h.PointsWriter.WritePoints(database, retentionPolicy, consistency, user, points); influxdb.IsClientError(err) {
if err := writePoints(); influxdb.IsClientError(err) {
atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
h.httpError(w, err.Error(), http.StatusBadRequest)
return
Expand Down
1 change: 1 addition & 0 deletions services/httpd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
statWriteRequestBytesReceived = "writeReqBytes" // Sum of all bytes in write requests.
statQueryRequestBytesTransmitted = "queryRespBytes" // Sum of all bytes returned in query reponses.
statPointsWrittenOK = "pointsWrittenOK" // Number of points written OK.
statValuesWrittenOK = "valuesWrittenOK" // Number of values (fields) written OK.
statPointsWrittenDropped = "pointsWrittenDropped" // Number of points dropped by the storage engine.
statPointsWrittenFail = "pointsWrittenFail" // Number of points that failed to be written.
statAuthFail = "authFail" // Number of authentication failures.
Expand Down
41 changes: 39 additions & 2 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,22 +1268,46 @@ func (e *Engine) addToIndexFromKey(keys [][]byte, fieldTypes []influxql.DataType
return nil
}

// WritePoints writes metadata and point data into the engine.
// It returns an error if new points are added to an existing key.
// WritePoints() is a thin wrapper for WritePointsWithContext().
//
// TODO: We should consider obsolteing and removing this function in favor of
// WritePointsWithContext()
//
func (e *Engine) WritePoints(points []models.Point) error {
return e.WritePointsWithContext(context.Background(), points)
}

// WritePointsWithContext() writes metadata and point data into the engine. It
// returns an error if new points are added to an existing key.
//
// In addition, it accepts a context.Context value. It stores write statstics
// to context values passed in of type tsdb.ContextKey. The metrics it stores
// are points written and values (fields) written.
//
// It expects int64 pointers to be stored in the tsdb.StatPointsWritten and
// tsdb.StatValuesWritten keys and will store the proper values if requested.
//
func (e *Engine) WritePointsWithContext(ctx context.Context, points []models.Point) error {
values := make(map[string][]Value, len(points))
var (
keyBuf []byte
baseLen int
seriesErr error
npoints int64 // total points processed
nvalues int64 // total values (fields) processed
)

for _, p := range points {
// TODO: In the future we'd like to check ctx.Err() for cancellation here.
// Beforehand we should measure the performance impact.

keyBuf = append(keyBuf[:0], p.Key()...)
keyBuf = append(keyBuf, keyFieldSeparator...)
baseLen = len(keyBuf)
iter := p.FieldIterator()
t := p.Time().UnixNano()

npoints++
for iter.Next() {
// Skip fields name "time", they are illegal
if bytes.Equal(iter.FieldKey(), timeBytes) {
Expand Down Expand Up @@ -1353,6 +1377,8 @@ func (e *Engine) WritePoints(points []models.Point) error {
default:
return fmt.Errorf("unknown field type for %s: %s", string(iter.FieldKey()), p.String())
}

nvalues++
values[string(keyBuf)] = append(values[string(keyBuf)], v)
}
}
Expand All @@ -1370,6 +1396,17 @@ func (e *Engine) WritePoints(points []models.Point) error {
return err
}
}

// if requested, store points written stats
if pointsWritten, ok := ctx.Value(tsdb.StatPointsWritten).(*int64); ok {
*pointsWritten = npoints
}

// if requested, store values written stats
if valuesWritten, ok := ctx.Value(tsdb.StatValuesWritten).(*int64); ok {
*valuesWritten = nvalues
}

return seriesErr
}

Expand Down
34 changes: 34 additions & 0 deletions tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2093,6 +2093,40 @@ func TestEngine_DisableEnableCompactions_Concurrent(t *testing.T) {
}
})
}

}

func TestEngine_WritePointsWithContext(t *testing.T) {
// Create a few points.
points := []models.Point{
MustParsePointString("cpu,host=A value=1.1 1000000000"),
MustParsePointString("cpu,host=B value=1.2,value2=8 2000000000"),
}

expectedPoints, expectedValues := int64(2), int64(3)

for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(index)

var numPoints, numValues int64

ctx := context.WithValue(context.Background(), tsdb.StatPointsWritten, &numPoints)
ctx = context.WithValue(ctx, tsdb.StatValuesWritten, &numValues)

if err := e.WritePointsWithContext(ctx, points); err != nil {
t.Fatalf("failed to write points: %v", err)
}

if got, expected := numPoints, expectedPoints; got != expected {
t.Fatalf("Expected stats to return %d points; got %d", expected, got)
}

if got, expected := numValues, expectedValues; got != expected {
t.Fatalf("Expected stats to return %d points; got %d", expected, got)
}
})
}
}

func TestEngine_WritePoints_TypeConflict(t *testing.T) {
Expand Down
Loading

0 comments on commit 6ce0e11

Please sign in to comment.