Skip to content

Commit

Permalink
Revert Series()/sync.Pool changes for now (#4609)
Browse files Browse the repository at this point in the history
* Revert "store: fix marshaling with sync.Pool (#4593)"

This reverts commit 8b4c3c9.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* Revert "[v2] store: reuse buffers for serializing Series() responses (#4535)"

This reverts commit 7a8d189.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS authored Aug 27, 2021
1 parent ba92c0c commit fdfc077
Show file tree
Hide file tree
Showing 15 changed files with 179 additions and 367 deletions.
6 changes: 0 additions & 6 deletions .bingo/Variables.mk
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,6 @@ $(PROTOC_GEN_GOGOFAST): $(BINGO_DIR)/protoc-gen-gogofast.mod
@echo "(re)installing $(GOBIN)/protoc-gen-gogofast-v1.3.2"
@cd $(BINGO_DIR) && $(GO) build -mod=mod -modfile=protoc-gen-gogofast.mod -o=$(GOBIN)/protoc-gen-gogofast-v1.3.2 "github.com/gogo/protobuf/protoc-gen-gogofast"

PROTOC_GO_INJECT_FIELD := $(GOBIN)/protoc-go-inject-field-v0.0.0-20170110051745-00204be12496
$(PROTOC_GO_INJECT_FIELD): $(BINGO_DIR)/protoc-go-inject-field.mod
@# Install binary/ries using Go 1.14+ build command. This is using bwplotka/bingo-controlled, separate go module with pinned dependencies.
@echo "(re)installing $(GOBIN)/protoc-go-inject-field-v0.0.0-20170110051745-00204be12496"
@cd $(BINGO_DIR) && $(GO) build -mod=mod -modfile=protoc-go-inject-field.mod -o=$(GOBIN)/protoc-go-inject-field-v0.0.0-20170110051745-00204be12496 "github.com/favadi/protoc-go-inject-field"

SHFMT := $(GOBIN)/shfmt-v3.1.2
$(SHFMT): $(BINGO_DIR)/shfmt.mod
@# Install binary/ries using Go 1.14+ build command. This is using bwplotka/bingo-controlled, separate go module with pinned dependencies.
Expand Down
5 changes: 0 additions & 5 deletions .bingo/protoc-go-inject-field.mod

This file was deleted.

2 changes: 0 additions & 2 deletions .bingo/variables.env
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,5 @@ PROMU="${GOBIN}/promu-v0.5.0"

PROTOC_GEN_GOGOFAST="${GOBIN}/protoc-gen-gogofast-v1.3.2"

PROTOC_GO_INJECT_FIELD="${GOBIN}/protoc-go-inject-field-v0.0.0-20170110051745-00204be12496"

SHFMT="${GOBIN}/shfmt-v3.1.2"

1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Changed
- [#4519](https://github.com/thanos-io/thanos/pull/4519) Query: switch to miekgdns DNS resolver as the default one.
- [#4535](https://github.com/thanos-io/thanos/pull/4535) Store: Reuse same buffer for Series() responses. On bigger queries this reduces Thanos Store memory usage by up to 50%.

## [v0.22.0](https://github.com/thanos-io/thanos/tree/release-0.22) - 2021.07.22

Expand Down
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ go-format: $(GOIMPORTS)

.PHONY: proto
proto: ## Generates Go files from Thanos proto files.
proto: check-git $(GOIMPORTS) $(PROTOC) $(PROTOC_GEN_GOGOFAST) $(PROTOC_GO_INJECT_FIELD)
@PROTOC_GO_INJECT_FIELD_BIN="$(PROTOC_GO_INJECT_FIELD)" GOIMPORTS_BIN="$(GOIMPORTS)" PROTOC_BIN="$(PROTOC)" PROTOC_GEN_GOGOFAST_BIN="$(PROTOC_GEN_GOGOFAST)" scripts/genproto.sh
proto: check-git $(GOIMPORTS) $(PROTOC) $(PROTOC_GEN_GOGOFAST)
@GOIMPORTS_BIN="$(GOIMPORTS)" PROTOC_BIN="$(PROTOC)" PROTOC_GEN_GOGOFAST_BIN="$(PROTOC_GEN_GOGOFAST)" scripts/genproto.sh

.PHONY: tarballs-release
tarballs-release: ## Build tarballs.
Expand Down Expand Up @@ -318,7 +318,6 @@ sync/atomic=go.uber.org/atomic" ./...
@go run ./scripts/copyright
@echo ">> ensuring generated proto files are up to date"
@$(MAKE) proto
@$(MAKE) format
$(call require_clean_work_tree,'detected files without copyright, run make lint and commit changes')

.PHONY: shell-lint
Expand Down
33 changes: 2 additions & 31 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,6 @@ type BucketStore struct {

// Enables hints in the Series() response.
enableSeriesResponseHints bool

// respPool is a sync.Pool for marshaling Series() responses.
respPool sync.Pool
}

type noopCache struct{}
Expand Down Expand Up @@ -399,7 +396,6 @@ func NewBucketStore(
enableCompatibilityLabel: enableCompatibilityLabel,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
enableSeriesResponseHints: enableSeriesResponseHints,
respPool: sync.Pool{},
}

for _, option := range options {
Expand Down Expand Up @@ -1097,17 +1093,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
s.metrics.seriesGetAllDuration.Observe(stats.getAllDuration.Seconds())
s.metrics.seriesBlocksQueried.Observe(float64(stats.blocksQueried))
}

var resp *storepb.SeriesResponse
defer func(r **storepb.SeriesResponse) {
if *r != nil {
(*r).Close()
}
}(&resp)

// Merge the sub-results from each selected block.
tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) {

begin := time.Now()

// NOTE: We "carefully" assume series and chunks are sorted within each SeriesSet. This should be guaranteed by
Expand All @@ -1128,15 +1115,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks)))
}
series.Labels = labelpb.ZLabelsFromPromLabels(lset)

if resp == nil {
resp = storepb.NewSeriesResponseWithPool(&series, &s.respPool)
} else {
resp.Result = &storepb.SeriesResponse_Series{
Series: &series,
}
}
if err = srv.Send(resp); err != nil {
if err = srv.Send(storepb.NewSeriesResponse(&series)); err != nil {
err = status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
return
}
Expand All @@ -1159,15 +1138,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
return
}

if resp == nil {
resp = storepb.NewHintsSeriesResponseWithPool(anyHints, &s.respPool)
} else {
resp.Result = &storepb.SeriesResponse_Hints{
Hints: anyHints,
}
}

if err = srv.Send(resp); err != nil {
if err = srv.Send(storepb.NewHintsSeriesResponse(anyHints)); err != nil {
err = status.Error(codes.Unknown, errors.Wrap(err, "send series response hints").Error())
return
}
Expand Down
1 change: 0 additions & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1482,7 +1482,6 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
queryGate: gate.NewNoop(),
chunksLimiterFactory: NewChunksLimiterFactory(0),
seriesLimiterFactory: NewSeriesLimiterFactory(0),
respPool: sync.Pool{},
}

t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) {
Expand Down
4 changes: 0 additions & 4 deletions pkg/store/hintspb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ package hintspb

import "github.com/oklog/ulid"

func (m *SeriesResponseHints) Len() int {
return len(m.QueriedBlocks)
}

func (m *SeriesResponseHints) AddQueriedBlock(id ulid.ULID) {
m.QueriedBlocks = append(m.QueriedBlocks, Block{
Id: id.String(),
Expand Down
4 changes: 4 additions & 0 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1547,13 +1547,17 @@ type storeSeriesServer struct {
SeriesSet []storepb.Series
Warnings []string
HintsSet []*types.Any

Size int64
}

func newStoreSeriesServer(ctx context.Context) *storeSeriesServer {
return &storeSeriesServer{ctx: ctx}
}

func (s *storeSeriesServer) Send(r *storepb.SeriesResponse) error {
s.Size += int64(r.Size())

if r.GetWarning() != "" {
s.Warnings = append(s.Warnings, r.GetWarning())
return nil
Expand Down
167 changes: 0 additions & 167 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sort"
"strconv"
"strings"
"sync"

"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
Expand Down Expand Up @@ -42,24 +41,6 @@ func NewSeriesResponse(series *Series) *SeriesResponse {
}
}

func NewSeriesResponseWithPool(series *Series, respPool *sync.Pool) *SeriesResponse {
return &SeriesResponse{
respPool: respPool,
Result: &SeriesResponse_Series{
Series: series,
},
}
}

func NewHintsSeriesResponseWithPool(hints *types.Any, respPool *sync.Pool) *SeriesResponse {
return &SeriesResponse{
respPool: respPool,
Result: &SeriesResponse_Hints{
Hints: hints,
},
}
}

func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse {
return &SeriesResponse{
Result: &SeriesResponse_Hints{
Expand Down Expand Up @@ -475,151 +456,3 @@ func CompareLabels(a, b []Label) int {
func LabelsToPromLabelsUnsafe(lset []Label) labels.Labels {
return labelpb.ZLabelsToPromLabels(lset)
}

// Type alias because protoc-go-inject-field does not support
// managing imports.
type syncPool = sync.Pool

// Close returns the memory used for marshaling, if any.
func (m *SeriesResponse) Close() {
if m == nil || m.respBuf == nil {
return
}

m.respPool.Put(m.respBuf)
m.respBuf = nil
}

// The following were copied/pasted from gogoprotobuf generated code with changes
// to make it work with sync.Pool / []byte slice.
func (m *SeriesResponse) Marshal() (dAtA []byte, err error) {
size := m.Size()

var respBuf []byte

// Slow path with no sync.Pool.
if m.respPool == nil {
respBuf = make([]byte, size)

n, err := m.MarshalToSizedBuffer(respBuf)
if err != nil {
return nil, err
}
return respBuf[len(respBuf)-n:], nil
}

// Fast path with sync.Pool.
// m.respBuf must not be nil so that it would be returned to the pool.

// If no pre-allocated buffer has been passed then try to get a new one.
if m.respBuf == nil {
poolBuf := m.respPool.Get()
// No previous buffer found in the pool, try to allocate.
if poolBuf == nil {
respBuf = make([]byte, size)
} else {
// Found something, let's see if it is big enough.
respBuf = *(poolBuf.(*[]byte))
}
} else {
respBuf = *m.respBuf
}

// Last sanity check of the size before the marshaling.
if cap(respBuf) < size {
if m.respPool != nil {
m.respPool.Put(&respBuf)
}
respBuf = make([]byte, size)
}
m.respBuf = &respBuf

// Possibly trim it so that there wouldn't be left-over "garbage" in the slice.
// TODO: check if it is needed to always trim this.
marshalBuf := respBuf[:size]
n, err := m.MarshalToSizedBuffer(marshalBuf)
if err != nil {
return nil, err
}
return marshalBuf[len(marshalBuf)-n:], nil
}

func (m *SeriesResponse) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}

type marshaler interface {
MarshalTo([]byte) (int, error)
}

func (m *SeriesResponse) MarshalToSizedBuffer(data []byte) (int, error) {
i := len(data)

if m.Result != nil {
size := m.Result.Size()
i -= size

if _, err := m.Result.(marshaler).MarshalTo(data[i:]); err != nil {
return 0, err
}
}
return len(data) - i, nil
}

func (m *SeriesResponse_Series) MarshalTo(data []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(data[:size])
}

func (m *SeriesResponse_Series) MarshalToSizedBuffer(data []byte) (int, error) {
i := len(data)
if m.Series != nil {
{
size, err := m.Series.MarshalToSizedBuffer(data[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRpc(data, i, uint64(size))
}
i--
data[i] = 0xa
}
return len(data) - i, nil
}
func (m *SeriesResponse_Warning) MarshalTo(data []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(data[:size])
}

func (m *SeriesResponse_Warning) MarshalToSizedBuffer(data []byte) (int, error) {
i := len(data)
i -= len(m.Warning)
copy(data[i:], m.Warning)
i = encodeVarintRpc(data, i, uint64(len(m.Warning)))
i--
data[i] = 0x12
return len(data) - i, nil
}
func (m *SeriesResponse_Hints) MarshalTo(data []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(data[:size])
}

func (m *SeriesResponse_Hints) MarshalToSizedBuffer(data []byte) (int, error) {
i := len(data)
if m.Hints != nil {
{
size, err := m.Hints.MarshalToSizedBuffer(data[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRpc(data, i, uint64(size))
}
i--
data[i] = 0x1a
}
return len(data) - i, nil
}
Loading

0 comments on commit fdfc077

Please sign in to comment.