Skip to content

Commit

Permalink
Fix buffered iterator (grafana#9976)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

In grafana#9700, we added support for
writing non-indexed labels from entries into chunks. This PR introduced
two bugs:

- When the buffered iterator is closed, the buffer to read metadata
labels is put back into the pool but not set to nil. Subsequent uses of
the iterator may write on top of the buffer that was put back on the
pool. This may lead to inconsistent/incorrect results.
- Inside the buffered iterator, we were not correctly handling EOFs
while reading the number of labels and each label length. This was due
to the `lastAttemp` variable not being reset.

This PR adds a new test for these two bugs and also fixes them.

**Which issue(s) this PR fixes**:
Fixes grafana#9700

**Special notes for your reviewer**:

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](grafana@d10549e)
  • Loading branch information
salvacorts authored Jul 19, 2023
1 parent d4f364b commit 78aad0b
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 1 deletion.
5 changes: 4 additions & 1 deletion pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {

// TODO: This is pretty similar to how we read the line size, and the metadata name and value sizes
// Maybe we can extract it to a separate function and reuse it?
lastAttempt = 0
var labelsWidth, nLabels int
for labelsWidth == 0 { // Read until we have enough bytes for the labels.
n, err := si.reader.Read(si.readBuf[si.readBufValid:])
Expand Down Expand Up @@ -1288,7 +1289,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {

// If not enough space for the labels, create a new buffer slice and put the old one back in the pool.
metaLabelsBufLen := nLabels * 2
if metaLabelsBufLen > cap(si.metaLabelsBuf) {
if si.metaLabelsBuf == nil || metaLabelsBufLen > cap(si.metaLabelsBuf) {
if si.metaLabelsBuf != nil {
for i := range si.metaLabelsBuf {
if si.metaLabelsBuf[i] != nil {
Expand All @@ -1309,6 +1310,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
// Read all the label-value pairs, into the buffer slice.
for i := 0; i < metaLabelsBufLen; i++ {
// Read the length of the label.
lastAttempt = 0
var labelWidth, labelSize int
for labelWidth == 0 { // Read until we have enough bytes for the name.
n, err := si.reader.Read(si.readBuf[si.readBufValid:])
Expand Down Expand Up @@ -1395,6 +1397,7 @@ func (si *bufferedIterator) close() {
for i := range si.metaLabelsBuf {
if si.metaLabelsBuf[i] != nil {
BytesBufferPool.Put(si.metaLabelsBuf[i])
si.metaLabelsBuf[i] = nil
}
}
LabelsPool.Put(si.metaLabelsBuf)
Expand Down
52 changes: 52 additions & 0 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1542,3 +1542,55 @@ func TestMemChunk_SpaceFor(t *testing.T) {
})
}
}

func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
for _, enc := range testEncoding {
enc := enc
t.Run(enc.String(), func(t *testing.T) {
chk := newMemChunkWithFormat(chunkFormatV4, enc, UnorderedWithMetadataHeadBlockFmt, testBlockSize, testTargetSize)
require.NoError(t, chk.Append(logprotoEntryWithMetadata(1, "lineA", []logproto.LabelAdapter{
{Name: "traceID", Value: "123"},
{Name: "user", Value: "a"},
})))
require.NoError(t, chk.Append(logprotoEntryWithMetadata(2, "lineB", []logproto.LabelAdapter{
{Name: "traceID", Value: "456"},
{Name: "user", Value: "b"},
})))
require.NoError(t, chk.cut())
require.NoError(t, chk.Append(logprotoEntryWithMetadata(3, "lineC", []logproto.LabelAdapter{
{Name: "traceID", Value: "789"},
{Name: "user", Value: "c"},
})))
require.NoError(t, chk.Append(logprotoEntryWithMetadata(4, "lineD", []logproto.LabelAdapter{
{Name: "traceID", Value: "123"},
{Name: "user", Value: "d"},
})))

expectedLines := []string{"lineA", "lineB", "lineC", "lineD"}
expectedStreams := []string{
labels.FromStrings("traceID", "123", "user", "a").String(),
labels.FromStrings("traceID", "456", "user", "b").String(),
labels.FromStrings("traceID", "789", "user", "c").String(),
labels.FromStrings("traceID", "123", "user", "d").String(),
}

// We will run the test twice so the iterator will be created twice.
// This is to ensure that the iterator is correctly closed.
for i := 0; i < 2; i++ {
it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err)

var lines []string
var streams []string
for it.Next() {
require.NoError(t, it.Error())
e := it.Entry()
lines = append(lines, e.Line)
streams = append(streams, logproto.FromLabelAdaptersToLabels(e.NonIndexedLabels).String())
}
assert.ElementsMatch(t, expectedLines, lines)
assert.ElementsMatch(t, expectedStreams, streams)
}
})
}
}

0 comments on commit 78aad0b

Please sign in to comment.