Skip to content

Commit 17a4577

Browse files
committed
consolidate buffer logic
1 parent 7b8b190 commit 17a4577

File tree

2 files changed

+135
-77
lines changed

2 files changed

+135
-77
lines changed

pkg/buffer/buffer.go

Lines changed: 71 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package buffer
22

33
import (
4-
"bufio"
4+
"bytes"
55
"fmt"
66
"io"
77
"net/http"
@@ -41,73 +41,82 @@ func ProcessResponseAsRingBufferToEnd(httpResp *http.Response, maxJobLogLines in
4141
totalLines := 0
4242
writeIndex := 0
4343

44-
scanner := bufio.NewScanner(httpResp.Body)
45-
// Set initial buffer to 64KB and max token size to 10MB to handle very long lines
46-
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
44+
const readBufferSize = 64 * 1024 // 64KB read buffer
45+
const maxDisplayLength = 1000 // Keep first 1000 chars of truncated lines
4746

48-
for scanner.Scan() {
49-
line := scanner.Text()
50-
totalLines++
51-
52-
lines[writeIndex] = line
53-
validLines[writeIndex] = true
54-
writeIndex = (writeIndex + 1) % maxJobLogLines
55-
}
56-
57-
if err := scanner.Err(); err != nil {
58-
// If we hit a token too long error, fall back to byte-by-byte reading
59-
// with line truncation to handle extremely long lines gracefully
60-
if err == bufio.ErrTooLong {
61-
return processWithLongLineHandling(httpResp.Body, lines, validLines, totalLines, writeIndex, maxJobLogLines)
62-
}
63-
return "", 0, httpResp, fmt.Errorf("failed to read log content: %w", err)
64-
}
65-
66-
var result []string
67-
linesInBuffer := totalLines
68-
if linesInBuffer > maxJobLogLines {
69-
linesInBuffer = maxJobLogLines
70-
}
71-
72-
startIndex := 0
73-
if totalLines > maxJobLogLines {
74-
startIndex = writeIndex
75-
}
76-
77-
for i := 0; i < linesInBuffer; i++ {
78-
idx := (startIndex + i) % maxJobLogLines
79-
if validLines[idx] {
80-
result = append(result, lines[idx])
81-
}
82-
}
83-
84-
return strings.Join(result, "\n"), totalLines, httpResp, nil
85-
}
86-
87-
// processWithLongLineHandling continues processing after encountering a line
88-
// that exceeds the scanner's max token size. It reads byte-by-byte and
89-
// truncates extremely long lines instead of failing.
90-
func processWithLongLineHandling(body io.Reader, lines []string, validLines []bool, totalLines, writeIndex, maxJobLogLines int) (string, int, *http.Response, error) {
91-
// Add a marker that we encountered truncated content
92-
truncatedMarker := "[LINE TRUNCATED - exceeded maximum line length of 10MB]"
93-
lines[writeIndex] = truncatedMarker
94-
validLines[writeIndex] = true
95-
totalLines++
96-
writeIndex = (writeIndex + 1) % maxJobLogLines
97-
98-
// Continue reading with a buffered reader, truncating long lines
99-
reader := bufio.NewReader(body)
47+
readBuf := make([]byte, readBufferSize)
10048
var currentLine strings.Builder
101-
const maxDisplayLength = 1000 // Keep first 1000 chars of truncated lines
49+
lineTruncated := false
10250

10351
for {
104-
b, err := reader.ReadByte()
52+
n, err := httpResp.Body.Read(readBuf)
53+
if n > 0 {
54+
chunk := readBuf[:n]
55+
for len(chunk) > 0 {
56+
// Find the next newline in the chunk
57+
newlineIdx := bytes.IndexByte(chunk, '\n')
58+
59+
if newlineIdx >= 0 {
60+
// Found a newline - complete the current line
61+
if !lineTruncated {
62+
remaining := maxLineSize - currentLine.Len()
63+
if remaining > newlineIdx {
64+
remaining = newlineIdx
65+
}
66+
if remaining > 0 {
67+
currentLine.Write(chunk[:remaining])
68+
}
69+
if currentLine.Len() >= maxLineSize {
70+
lineTruncated = true
71+
}
72+
}
73+
74+
// Store the completed line
75+
line := currentLine.String()
76+
if lineTruncated {
77+
// Only keep first maxDisplayLength chars for truncated lines
78+
if len(line) > maxDisplayLength {
79+
line = line[:maxDisplayLength]
80+
}
81+
line += "... [TRUNCATED]"
82+
}
83+
lines[writeIndex] = line
84+
validLines[writeIndex] = true
85+
totalLines++
86+
writeIndex = (writeIndex + 1) % maxJobLogLines
87+
88+
// Reset for next line
89+
currentLine.Reset()
90+
lineTruncated = false
91+
chunk = chunk[newlineIdx+1:]
92+
} else {
93+
// No newline in remaining chunk - accumulate if not truncated
94+
if !lineTruncated {
95+
remaining := maxLineSize - currentLine.Len()
96+
if remaining > len(chunk) {
97+
remaining = len(chunk)
98+
}
99+
if remaining > 0 {
100+
currentLine.Write(chunk[:remaining])
101+
}
102+
if currentLine.Len() >= maxLineSize {
103+
lineTruncated = true
104+
}
105+
}
106+
break
107+
}
108+
}
109+
}
110+
105111
if err == io.EOF {
106112
// Handle final line without newline
107113
if currentLine.Len() > 0 {
108114
line := currentLine.String()
109-
if len(line) > maxLineSize {
110-
line = line[:maxDisplayLength] + "... [TRUNCATED]"
115+
if lineTruncated {
116+
if len(line) > maxDisplayLength {
117+
line = line[:maxDisplayLength]
118+
}
119+
line += "... [TRUNCATED]"
111120
}
112121
lines[writeIndex] = line
113122
validLines[writeIndex] = true
@@ -116,22 +125,7 @@ func processWithLongLineHandling(body io.Reader, lines []string, validLines []bo
116125
break
117126
}
118127
if err != nil {
119-
return "", 0, nil, fmt.Errorf("failed to read log content: %w", err)
120-
}
121-
122-
if b == '\n' {
123-
line := currentLine.String()
124-
if len(line) > maxLineSize {
125-
line = line[:maxDisplayLength] + "... [TRUNCATED]"
126-
}
127-
lines[writeIndex] = line
128-
validLines[writeIndex] = true
129-
totalLines++
130-
writeIndex = (writeIndex + 1) % maxJobLogLines
131-
currentLine.Reset()
132-
} else if currentLine.Len() < maxLineSize+maxDisplayLength {
133-
// Stop accumulating bytes once we exceed the limit (plus buffer for truncation message)
134-
currentLine.WriteByte(b)
128+
return "", 0, httpResp, fmt.Errorf("failed to read log content: %w", err)
135129
}
136130
}
137131

@@ -153,5 +147,5 @@ func processWithLongLineHandling(body io.Reader, lines []string, validLines []bo
153147
}
154148
}
155149

156-
return strings.Join(result, "\n"), totalLines, nil, nil
150+
return strings.Join(result, "\n"), totalLines, httpResp, nil
157151
}

pkg/buffer/buffer_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package buffer
22

33
import (
4+
"fmt"
45
"io"
56
"net/http"
67
"strings"
@@ -76,4 +77,67 @@ func TestProcessResponseAsRingBufferToEnd(t *testing.T) {
7677
assert.Contains(t, result, "start")
7778
assert.Contains(t, result, "end")
7879
})
80+
81+
t.Run("ring buffer with long line in middle of many lines", func(t *testing.T) {
82+
// Create many lines with a long line in the middle
83+
// Ring buffer size is 5, so we should only keep the last 5 lines
84+
var sb strings.Builder
85+
for i := 1; i <= 10; i++ {
86+
sb.WriteString(fmt.Sprintf("line%d\n", i))
87+
}
88+
// Insert an 11MB line (exceeds maxLineSize of 10MB)
89+
longLine := strings.Repeat("x", 11*1024*1024)
90+
sb.WriteString(longLine)
91+
sb.WriteString("\n")
92+
for i := 11; i <= 20; i++ {
93+
sb.WriteString(fmt.Sprintf("line%d\n", i))
94+
}
95+
96+
resp := &http.Response{
97+
Body: io.NopCloser(strings.NewReader(sb.String())),
98+
}
99+
100+
result, totalLines, respOut, err := ProcessResponseAsRingBufferToEnd(resp, 5)
101+
if respOut != nil && respOut.Body != nil {
102+
defer respOut.Body.Close()
103+
}
104+
require.NoError(t, err)
105+
// 10 lines before + 1 long line + 10 lines after = 21 total
106+
assert.Equal(t, 21, totalLines)
107+
// Should only have the last 5 lines (line16 through line20)
108+
assert.Contains(t, result, "line16")
109+
assert.Contains(t, result, "line17")
110+
assert.Contains(t, result, "line18")
111+
assert.Contains(t, result, "line19")
112+
assert.Contains(t, result, "line20")
113+
// Should NOT contain earlier lines
114+
assert.NotContains(t, result, "line1\n")
115+
assert.NotContains(t, result, "line10\n")
116+
// The truncated line should not be in the last 5
117+
assert.NotContains(t, result, "TRUNCATED")
118+
})
119+
120+
t.Run("ring buffer keeps truncated line when in last N", func(t *testing.T) {
121+
// Long line followed by only 2 more lines, with ring buffer size 5
122+
longLine := strings.Repeat("y", 11*1024*1024)
123+
body := "line1\nline2\nline3\n" + longLine + "\nlineA\nlineB\n"
124+
resp := &http.Response{
125+
Body: io.NopCloser(strings.NewReader(body)),
126+
}
127+
128+
result, totalLines, respOut, err := ProcessResponseAsRingBufferToEnd(resp, 5)
129+
if respOut != nil && respOut.Body != nil {
130+
defer respOut.Body.Close()
131+
}
132+
require.NoError(t, err)
133+
assert.Equal(t, 6, totalLines)
134+
// Last 5: line2, line3, truncated, lineA, lineB
135+
assert.Contains(t, result, "line2")
136+
assert.Contains(t, result, "line3")
137+
assert.Contains(t, result, "TRUNCATED")
138+
assert.Contains(t, result, "lineA")
139+
assert.Contains(t, result, "lineB")
140+
// line1 should be rotated out
141+
assert.NotContains(t, result, "line1")
142+
})
79143
}

0 commit comments

Comments
 (0)