Skip to content

Commit

Permalink
Distributor.queryIngesterStream: Free gRPC buffers upon error (#9756)
Browse files Browse the repository at this point in the history
* Distributor.queryIngesterStream: Free gRPC buffers upon error

---------

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 authored Oct 30, 2024
1 parent f4ca071 commit eda1a4b
Showing 1 changed file with 23 additions and 3 deletions.
26 changes: 23 additions & 3 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,20 @@ type ingesterQueryResult struct {
chunkseriesBatches [][]ingester_client.TimeSeriesChunk
timeseriesBatches [][]mimirpb.TimeSeries
streamingSeries seriesChunksStream

// Retain responses owning referenced gRPC buffers, until they are freed.
responses []*ingester_client.QueryStreamResponse
}

func (r *ingesterQueryResult) addResponse(resp *ingester_client.QueryStreamResponse) {
r.responses = append(r.responses, resp)
}

func (r *ingesterQueryResult) freeBuffers() {
for _, resp := range r.responses {
resp.FreeBuffer()
}
r.responses = nil
}

// queryIngesterStream queries the ingesters using the gRPC streaming API.
Expand All @@ -215,7 +229,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [

// queryIngester MUST call cancelContext once processing is completed in order to release resources. It's required
// by ring.DoMultiUntilQuorumWithoutSuccessfulContextCancellation() to properly release resources.
queryIngester := func(ctx context.Context, ing *ring.InstanceDesc, cancelContext context.CancelCauseFunc) (ingesterQueryResult, error) {
queryIngester := func(ctx context.Context, ing *ring.InstanceDesc, cancelContext context.CancelCauseFunc) (result ingesterQueryResult, err error) {
log, ctx := spanlogger.NewWithLogger(ctx, d.log, "Distributor.queryIngesterStream")
cleanup := func() {
log.Span.Finish()
Expand All @@ -234,6 +248,10 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [

cleanup()
}

if err != nil {
result.freeBuffers()
}
}()

log.Span.SetTag("ingester_address", ing.Addr)
Expand All @@ -249,15 +267,15 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
return ingesterQueryResult{}, err
}

result := ingesterQueryResult{}

// Why retain the batches rather than iteratively build a single slice?
// If we iteratively build a single slice, we'll spend a lot of time copying elements as the slice grows beyond its capacity.
// So instead, we build the slice in one go once we know how many series we have.
var streamingSeriesBatches [][]labels.Labels
streamingSeriesCount := 0

for {
// XXX: Note that while we free responses' gRPC buffers on error, we don't do the same in case of success,
// as the combined response retains references to gRPC buffers.
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
// We will never get an EOF here from an ingester that is streaming chunks, so we don't need to do anything to set up streaming here.
Expand All @@ -266,6 +284,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
return ingesterQueryResult{}, err
}

result.addResponse(resp)

if len(resp.Timeseries) > 0 {
for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
Expand Down

0 comments on commit eda1a4b

Please sign in to comment.