Skip to content

Conversation

kasia-kujawa
Copy link
Contributor

Description:

Do not update fingerprint bytes when less data has been read than it was read during initialisation of fingerprint bytes

Link to tracking Issue:

#22936

Testing:

Manual tests - done
Unit test - in progress

Documentation:

@github-actions github-actions bot requested a review from djaglowski May 30, 2023 09:00
@kasia-kujawa kasia-kujawa force-pushed the kk-fix-stanza-fileconsumer branch 2 times, most recently from 9d6578e to d56a30d Compare May 30, 2023 13:44
@kasia-kujawa
Copy link
Contributor Author

I'm checking failures of unit tests.

@djaglowski
Copy link
Member

@kkujawa-sumo, thanks for reporting this and opening the PR.

I believe this may be related to some observations made by @maokitty in #19767, #19428, and #20851. Unfortunately, we were not able to resolve the issues yet.

@kasia-kujawa kasia-kujawa force-pushed the kk-fix-stanza-fileconsumer branch 3 times, most recently from 310f5e6 to e8513a5 Compare June 1, 2023 10:32
@kasia-kujawa kasia-kujawa force-pushed the kk-fix-stanza-fileconsumer branch from e8513a5 to 6e9dff0 Compare June 1, 2023 10:52
@kasia-kujawa
Copy link
Contributor Author

kasia-kujawa commented Jun 1, 2023

It looks like unit tests failed for commit because of additional operation in Read function, to see similar problem in my environment I had to modify test to make rotation very very fast - so probably problems reported by @maokitty showed up.

@djaglowski I propose another approach to fix problem that I found (@22936) increasing scanner buffer to fingerprint size to enable reading the whole fingerprint in single iteration, please see this. What do you think about this approach?

Copy link
Member

@djaglowski djaglowski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still not clear to me why this scenario fails. Can you describe the failure mechanism in more detail?

Comment on lines +86 to +94
func (b *readerBuilder) withBufferSize() *readerBuilder {
if b.readerConfig.fingerprintSize > defaultBufSize {
b.bufferSize = b.readerConfig.fingerprintSize
} else {
b.bufferSize = defaultBufSize
}
return b
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to always set the buffer size like this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now default buffer size is set to 16 kb, default fingerprint size is set 1kb in filelogreceiver so the difference is significant, lower buffer for reading could impact speed of reading from the log file (more read operations with smaller buffer), higher fingerprint size could cause higher memory consumption - more data kept in memory to store fingerprint size.

Maybe consequences of setting always b.bufferSize = b.readerConfig.fingerprintSize will not be visible but the issue which I observed exists only for cases when fingerprint size is higher than the buffer.

@@ -32,6 +32,7 @@ type Reader struct {
processFunc EmitFunc

Fingerprint *Fingerprint
bufferSize int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be on readerConfig instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change this

@kasia-kujawa kasia-kujawa force-pushed the kk-fix-stanza-fileconsumer branch from 6e9dff0 to da83b13 Compare June 2, 2023 08:27
…t size

to enable reading the whole fingerprint in single iteration

Signed-off-by: Katarzyna Kujawa <kkujawa@sumologic.com>
@kasia-kujawa kasia-kujawa force-pushed the kk-fix-stanza-fileconsumer branch from da83b13 to 2689991 Compare June 2, 2023 08:29
@kasia-kujawa
Copy link
Contributor Author

kasia-kujawa commented Jun 5, 2023

@djaglowski more details

This will describe what happens in Read function now when generated_log.txt is read

in initialization fingerprint is read,

  • length of fingerprint: 16450 (equal to length of the log file),
  • in configuration fingerprint_size: 17408
  • scanner buffer size 16384 B = 16 kB
  1. the 1s execution of Read function
  • at the beginning: len(r.Fingerprint.FirstBytes): 16450 offset: 0 read Bytes: 0
  • Reading of the file r.file.Read(dst)
    • all read Bytes from file: 16384 offset 0 appendCount 16384
    • new fingerprint r.Fingerprint.FirstBytes = append(r.Fingerprint.FirstBytes[:r.Offset], dst[:appendCount]...)
      so new size of fingerprint is 16384
  1. the 2nd execution of Read function

    • at the beginning: len(r.Fingerprint.FirstBytes): 16384 offset: 16330 read Bytes: 16384
    • Reading of the file r.file.Read(dst)
      • read Bytes: 16384 offset 0 appendCount 16384
      • all read Bytes from file: 16450 (the whole file was read), in this execution of r.file.Read(dst) 66 bytes was read
      • offset: 16330, appendCount 66
      • again new fingerprint is set
        r.Fingerprint.FirstBytes = append(r.Fingerprint.FirstBytes[:r.Offset], dst[:appendCount]...) so it is
        r.Fingerprint.FirstBytes = append(r.Fingerprint.FirstBytes[:r.16330], dst[:66]...)
        so new size of fingerprint is 16330 + 66 = 16396 and 54 bytes (16384 - 16330) are missed from fingerprint
        so last character of new fingerprint are different (in comparison of fingerprints there are not equal!) because os.File.Read() reads only unread bytes from the file
        (below explanation what happens in Scan() function and why log file is correctly read)
  2. the3rd execution of Read function

    • at the beginning: len(r.Fingerprint.FirstBytes): 16396 offset: 16450 read Bytes: 16450
    • in line
      there is condition int(r.Offset) > len(r.Fingerprint.FirstBytes) and now offset is higher than the length of the fingerprint so Read will try to Read something without update of fingerprint, in this line

Checking what happens Scan() function from bufio package

  1. Read() reads 16384 bytes from file
  2. split() splits 16384 bytes and until s.start=16330 s.end=16384 (s.end - s.start = 54 bytes)
  3. buffer is copied and 54 bytes which left after splitting are copied into beginning of the buffer, see this line
    new value of s.end is 54 see this
  4. Read() reads file again to the buf[54:16384]
  5. split() splits buf
  6. loop finishes because of EOF

and I checked how os.File.Read() reads data from file using this simple code:

package main

import (
	"fmt"
	"log"
	"os"
)

func main() {

	file, err := os.Open("test.txt") // For read access.
	defer file.Close()
	if err != nil {
		log.Fatal(err)
	}

	data := make([]byte, 5)
	count, err := file.Read(data)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("read %d bytes: %q\n", count, data[:count])

	data1 := make([]byte, 5)
	count1, err1 := file.Read(data1)
	if err != nil {
		log.Fatal(err1)
	}
	fmt.Printf("read %d bytes: %q\n", count1, data1[:count1])

}

for the file:

0123456789

the output is

read 5 bytes: "01234"
read 5 bytes: "56789"

and same situation is in the code of fileconsumer, os.File.Read() reads only unread data and data which left after splitting are kept in the buffer

@djaglowski
Copy link
Member

@kkujawa-sumo, thanks for detailing what happens in the scenario.

I am still having difficultly understanding the root cause of this issue, but this stands out to me as a problem:

  1. the 2nd execution of Read function
  • at the beginning: len(r.Fingerprint.FirstBytes): 16384 offset: 16330 read Bytes: 16384

Specifically, this seems to indicate that the offset has been incorrectly updated. If this is indeed the case, then I think the proposed change may only avoid the issue, rather than solve it.

@djaglowski
Copy link
Member

I looked into this some more and believe the error lies in the Reader.Read function. I haven't pinned down exactly what was wrong. However, I rewrote the function to explicitly handle each possible case. This appears to pass the test you wrote. @kkujawa-sumo, do you mind trying this implementation?

// Read from the file and update the fingerprint if necessary
func (r *Reader) Read(dst []byte) (n int, err error) {
	n, err = r.file.Read(dst)

	// CASE 0: Steady state
	//   Just return what was read.
	if len(r.Fingerprint.FirstBytes) == r.fingerprintSize {
		return
	}

	// CASE 1: Fingerprint building
	//   The file is small or being read for the first time.
	//   It is the only condition in which we should update the fingerprint.
	//   Handled before edge cases in order to avoid unnecessary work.
	if len(r.Fingerprint.FirstBytes) < r.fingerprintSize && len(r.Fingerprint.FirstBytes) == int(r.Offset) {
		appendCount := r.fingerprintSize - len(r.Fingerprint.FirstBytes)
		if appendCount > n {
			appendCount = n
		}
		r.Fingerprint.FirstBytes = append(r.Fingerprint.FirstBytes, dst[:appendCount]...)
		return
	}

	// The remaining cases can happen when the following has occured:
	//   1. The component was started with a storage extension and saved a (partial or full) fingerprint for the file.
	//   2. The component was restarted with the same storage extension and a different setting for 'fingerprint_size'.

	// CASE 2: Oversized fingerprint
	//   The component was restarted with a decreased 'fingerprint_size'. Just keep reading.
	if len(r.Fingerprint.FirstBytes) > r.fingerprintSize {
		return
	}

	// CASE 3: Undersized fingerprint
	//   At this point we know that the fingerprint is undersized (didn't match CASE 0 or CASE 2).
	//   However, we also know that the fingerprint and offset are not aligned (didn't match second condition of CASE 1).
	//   Therefore, we cannot update the fingerprint because we are not reading from the correct staring position.
	return
}

If you agree it works, I'll continue on this tomorrow by writing a set of tests to validate each case.

@kasia-kujawa
Copy link
Contributor Author

@kkujawa-sumo, thanks for detailing what happens in the scenario.

I am still having difficultly understanding the root cause of this issue, but this stands out to me as a problem:

  1. the 2nd execution of Read function
  • at the beginning: len(r.Fingerprint.FirstBytes): 16384 offset: 16330 read Bytes: 16384

Specifically, this seems to indicate that the offset has been incorrectly updated. If this is indeed the case, then I think the proposed change may only avoid the issue, rather than solve it.

Offset is set to the end of the line but amount of bytes read from the file is larger ( offset != read bytes), some data are in the buffer and they waits for next iteration of reading.

last characters read in first iteration of reading

2023-05-30 09:02:52.577515 +0200 CEST m=+0.009851256 line 137 log abcdefghijklmnopqrstuvwxyz1234567890, end of line 137
2023-05-30 09:02:52.577518 +0200 CEST m=+0.009853936 l

offset is set after end of line 137
2023-05-30 09:02:52.577518 +0200 CEST m=+0.009853936 l is kept in the buffer
in next iteration of the reading following characters will be read

ine 138 log abcdefghijklmnopqrstuvwxyz1234567890, end of line 138

I think that offset is correctly set - at the end of fully processed line and the root cause is that the offset was treated as the indicator of read bytes form the file what is not true because some bytes are temporary kept in the buffer (when the reading finishes reading in the middle of the line).

@kasia-kujawa
Copy link
Contributor Author

I looked into this some more and believe the error lies in the Reader.Read function. I haven't pinned down exactly what was wrong. However, I rewrote the function to explicitly handle each possible case. This appears to pass the test you wrote. @kkujawa-sumo, do you mind trying this implementation?

// Read from the file and update the fingerprint if necessary
func (r *Reader) Read(dst []byte) (n int, err error) {
	n, err = r.file.Read(dst)

	// CASE 0: Steady state
	//   Just return what was read.
	if len(r.Fingerprint.FirstBytes) == r.fingerprintSize {
		return
	}

	// CASE 1: Fingerprint building
	//   The file is small or being read for the first time.
	//   It is the only condition in which we should update the fingerprint.
	//   Handled before edge cases in order to avoid unnecessary work.
	if len(r.Fingerprint.FirstBytes) < r.fingerprintSize && len(r.Fingerprint.FirstBytes) == int(r.Offset) {
		appendCount := r.fingerprintSize - len(r.Fingerprint.FirstBytes)
		if appendCount > n {
			appendCount = n
		}
		r.Fingerprint.FirstBytes = append(r.Fingerprint.FirstBytes, dst[:appendCount]...)
		return
	}

	// The remaining cases can happen when the following has occured:
	//   1. The component was started with a storage extension and saved a (partial or full) fingerprint for the file.
	//   2. The component was restarted with the same storage extension and a different setting for 'fingerprint_size'.

	// CASE 2: Oversized fingerprint
	//   The component was restarted with a decreased 'fingerprint_size'. Just keep reading.
	if len(r.Fingerprint.FirstBytes) > r.fingerprintSize {
		return
	}

	// CASE 3: Undersized fingerprint
	//   At this point we know that the fingerprint is undersized (didn't match CASE 0 or CASE 2).
	//   However, we also know that the fingerprint and offset are not aligned (didn't match second condition of CASE 1).
	//   Therefore, we cannot update the fingerprint because we are not reading from the correct staring position.
	return
}

If you agree it works, I'll continue on this tomorrow by writing a set of tests to validate each case.

@djaglowski I'm ok with fixing the problem in Read() function and your proposition looks good :)
I tested this proposition with two log files for which I saw the issue before and I don't see issue.

I also wrote test with line longer than default buffer and larger fingerprintsize - it passes:

func TestReadingWithLargeFingerPrintSizeAndLineLargerThanScannerBuf(t *testing.T) {
	t.Parallel()
	tempDir := t.TempDir()

	lineLength := defaultBufSize + 1024
	fingerPrintSize := defaultBufSize + 2*1024

	// Generate log lines
	body := "0abcdefghijklmnopqrstuvwxyz"
	firstline := ""
	for len(firstline) < lineLength {
		firstline += body
	}
	firstline = fmt.Sprintf("%s\n", firstline[:lineLength-1])
	require.Equal(t, lineLength, len(firstline))

	// Create a new file
	temp := openTemp(t, tempDir)
	writeString(t, temp, firstline)

	// Create reader
	emitChan := make(chan *emitParams, 1000)
	splitterConfig := helper.NewSplitterConfig()
	f := &readerFactory{
		SugaredLogger: testutil.Logger(t),
		readerConfig: &readerConfig{
			fingerprintSize: fingerPrintSize,
			maxLogSize:      defaultMaxLogSize,
			emit:            testEmitFunc(emitChan),
		},
		fromBeginning:   true,
		splitterFactory: newMultilineSplitterFactory(splitterConfig),
		encodingConfig:  splitterConfig.EncodingConfig,
	}

	r, err := f.newReaderBuilder().withFile(temp).withBufferSize().build()
	require.NoError(t, err)

	initialFingerPrintSize := len(r.Fingerprint.FirstBytes)
	initialFingerPrint := string(r.Fingerprint.FirstBytes)
	r.ReadToEnd(context.Background())
	require.Equal(t, initialFingerPrintSize, len(r.Fingerprint.FirstBytes))
	require.Equal(t, initialFingerPrint, string(r.Fingerprint.FirstBytes))

	require.Equal(t, []byte(firstline[:lineLength-1]), readToken(t, emitChan))
}

@djaglowski
Copy link
Member

Thanks for confirming @kkujawa-sumo. I've opened #23183. Perhaps if it's merged you'll rebase to add your test?

@kasia-kujawa
Copy link
Contributor Author

kasia-kujawa commented Jun 7, 2023

Thanks for confirming @kkujawa-sumo. I've opened #23183. Perhaps if it's merged you'll rebase to add your test?

@djaglowski Thanks for help with this issue ❤️
I'll check test cases which you added in #23183 and if I see that my tests adds something new I'll open pull request ;)

djaglowski added a commit that referenced this pull request Feb 27, 2024
Depends on #31298

Fixes #22936

This PR changes the way readers update their fingerprints.

Currently, when `reader.ReadToEnd` is called, it creates a scanner and
passes itself (the reader) in as the `io.Reader` so that a custom
implementation of `Read` will be used by the scanner. Each time the
scanner calls `Read`, we try to perform appropriate reasoning about
whether the data we've read should be appended to the fingerprint. The
problem is that the correct positioning of the bytes buffer is in some
rare cases different than the file's "offset", as we track it. See
example
[here](#22937 (comment)).

There appear to be two ways to solve this. A "simple" solution is to
independently determine the file handle's current offset with a clever
use of `Seek`, ([suggested
here](https://stackoverflow.com/a/10901436/3511338). Although this does
appear to work, it leaves open the possibility that the fingerprint is
corrupted because _if the file was truncated_, we may be updating the
fingerprint with incorrect information.

The other solution, proposed in this PR, simply has the custom `Read`
function set a flag to indicate that the fingerprint _should_ be
updated. Then, just before returning from `ReadToEnd`, we create an
entirely new fingerprint. This has the advantage of not having to manage
any kind of append operations, but also allows the the opportunity to
independently check that the fingerprint has not been altered by
truncation.

Benchmarks appear to show all three solutions are close in performance.
XinRanZhAWS pushed a commit to XinRanZhAWS/opentelemetry-collector-contrib that referenced this pull request Mar 13, 2024
Depends on open-telemetry#31298

Fixes open-telemetry#22936

This PR changes the way readers update their fingerprints.

Currently, when `reader.ReadToEnd` is called, it creates a scanner and
passes itself (the reader) in as the `io.Reader` so that a custom
implementation of `Read` will be used by the scanner. Each time the
scanner calls `Read`, we try to perform appropriate reasoning about
whether the data we've read should be appended to the fingerprint. The
problem is that the correct positioning of the bytes buffer is in some
rare cases different than the file's "offset", as we track it. See
example
[here](open-telemetry#22937 (comment)).

There appear to be two ways to solve this. A "simple" solution is to
independently determine the file handle's current offset with a clever
use of `Seek`, ([suggested
here](https://stackoverflow.com/a/10901436/3511338). Although this does
appear to work, it leaves open the possibility that the fingerprint is
corrupted because _if the file was truncated_, we may be updating the
fingerprint with incorrect information.

The other solution, proposed in this PR, simply has the custom `Read`
function set a flag to indicate that the fingerprint _should_ be
updated. Then, just before returning from `ReadToEnd`, we create an
entirely new fingerprint. This has the advantage of not having to manage
any kind of append operations, but also allows the the opportunity to
independently check that the fingerprint has not been altered by
truncation.

Benchmarks appear to show all three solutions are close in performance.
ghost pushed a commit to opsramp/opentelemetry-collector-contrib that referenced this pull request May 5, 2024
Depends on open-telemetry#31298

Fixes open-telemetry#22936

This PR changes the way readers update their fingerprints.

Currently, when `reader.ReadToEnd` is called, it creates a scanner and
passes itself (the reader) in as the `io.Reader` so that a custom
implementation of `Read` will be used by the scanner. Each time the
scanner calls `Read`, we try to perform appropriate reasoning about
whether the data we've read should be appended to the fingerprint. The
problem is that the correct positioning of the bytes buffer is in some
rare cases different than the file's "offset", as we track it. See
example
[here](open-telemetry#22937 (comment)).

There appear to be two ways to solve this. A "simple" solution is to
independently determine the file handle's current offset with a clever
use of `Seek`, ([suggested
here](https://stackoverflow.com/a/10901436/3511338). Although this does
appear to work, it leaves open the possibility that the fingerprint is
corrupted because _if the file was truncated_, we may be updating the
fingerprint with incorrect information.

The other solution, proposed in this PR, simply has the custom `Read`
function set a flag to indicate that the fingerprint _should_ be
updated. Then, just before returning from `ReadToEnd`, we create an
entirely new fingerprint. This has the advantage of not having to manage
any kind of append operations, but also allows the the opportunity to
independently check that the fingerprint has not been altered by
truncation.

Benchmarks appear to show all three solutions are close in performance.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants