Skip to content

Commit a9b8e2e

Browse files
authored
Add support for Readable#discard_until. (#7)
1 parent fb9f443 commit a9b8e2e

File tree

2 files changed

+140
-5
lines changed

2 files changed

+140
-5
lines changed

lib/io/stream/readable.rb

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,17 +116,28 @@ def readpartial(size = nil)
116116
# @parameter offset [Integer] The offset to start searching from.
117117
# @parameter limit [Integer | Nil] The maximum number of bytes to read while searching.
118118
# @returns [Integer | Nil] The index of the pattern, or nil if not found.
119-
private def index_of(pattern, offset, limit)
119+
private def index_of(pattern, offset, limit, discard = false)
120120
# We don't want to split on the pattern, so we subtract the size of the pattern.
121121
split_offset = pattern.bytesize - 1
122-
122+
123123
until index = @read_buffer.index(pattern, offset)
124124
offset = @read_buffer.bytesize - split_offset
125125

126126
offset = 0 if offset < 0
127127

128-
return nil if limit and offset >= limit
129-
return nil unless fill_read_buffer
128+
if limit and offset >= limit
129+
return nil
130+
end
131+
132+
unless fill_read_buffer
133+
return nil
134+
end
135+
136+
if discard
137+
# If we are discarding, we should consume the read buffer up to the offset:
138+
consume_read_buffer(offset)
139+
offset = 0
140+
end
130141
end
131142

132143
return index
@@ -136,7 +147,8 @@ def readpartial(size = nil)
136147
# @parameter pattern [String] The pattern to match.
137148
# @parameter offset [Integer] The offset to start searching from.
138149
# @parameter limit [Integer] The maximum number of bytes to read, including the pattern (even if chomped).
139-
# @returns [String | Nil] The contents of the stream up until the pattern, which is consumed but not returned.
150+
# @parameter chomp [Boolean] Whether to remove the pattern from the returned data.
151+
# @returns [String | Nil] The contents of the stream up until the pattern, or nil if the pattern was not found.
140152
def read_until(pattern, offset = 0, limit: nil, chomp: true)
141153
if index = index_of(pattern, offset, limit)
142154
return nil if limit and index >= limit
@@ -149,6 +161,28 @@ def read_until(pattern, offset = 0, limit: nil, chomp: true)
149161
end
150162
end
151163

164+
# Efficiently discard data from the stream until encountering pattern.
165+
# @parameter pattern [String] The pattern to match.
166+
# @parameter offset [Integer] The offset to start searching from.
167+
# @parameter limit [Integer] The maximum number of bytes to read, including the pattern.
168+
# @returns [String | Nil] The contents of the stream up until the pattern, or nil if the pattern was not found.
169+
def discard_until(pattern, offset = 0, limit: nil)
170+
if index = index_of(pattern, offset, limit, true)
171+
@read_buffer.freeze
172+
173+
if limit and index >= limit
174+
@read_buffer = @read_buffer.byteslice(limit, @read_buffer.bytesize)
175+
176+
return nil
177+
end
178+
179+
matched = @read_buffer.byteslice(0, index+pattern.bytesize)
180+
@read_buffer = @read_buffer.byteslice(index+pattern.bytesize, @read_buffer.bytesize)
181+
182+
return matched
183+
end
184+
end
185+
152186
# Peek at data in the buffer without consuming it.
153187
# @parameter size [Integer | Nil] The number of bytes to peek at. If nil, peek at all available data.
154188
# @returns [String] The data in the buffer without consuming it.

test/io/stream/buffered.rb

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,107 @@ def before
438438
expect(write_buffer).to be(:empty?)
439439
end
440440
end
441+
442+
with "#discard_until" do
443+
it "can discard data until pattern" do
444+
server.write("hello\nworld\ntest")
445+
server.close
446+
447+
# Discard until "\n" - should return chunk ending with the pattern
448+
chunk = client.discard_until("\n")
449+
expect(chunk).not.to be_nil
450+
expect(chunk).to be(:end_with?, "\n")
451+
# Read the remaining data to verify it starts with "world"
452+
expect(client.read(5)).to be == "world"
453+
454+
# Discard until "t" - should return chunk ending with the pattern
455+
chunk = client.discard_until("t")
456+
expect(chunk).not.to be_nil
457+
expect(chunk).to be(:end_with?, "t")
458+
# Read remaining data
459+
expect(client.read).to be == "est"
460+
end
461+
462+
it "returns nil when pattern not found and discards all data" do
463+
server.write("hello world")
464+
server.close
465+
466+
expect(client.discard_until("\n")).to be_nil
467+
# Data should still be available since pattern was not found
468+
expect(client.read).to be == "hello world"
469+
end
470+
471+
it "can discard with a limit" do
472+
server.write("hello\nworld\n")
473+
server.close
474+
475+
# Use peek to verify initial buffer state
476+
expect(client.peek).to be == "hello\nworld\n"
477+
478+
# Limit too small to find pattern - discards up to limit
479+
expect(client.discard_until("\n", limit: 4)).to be_nil
480+
481+
# Use peek to verify that 4 bytes were discarded
482+
expect(client.peek).to be == "o\nworld\n"
483+
484+
# After discarding 4 bytes, should find pattern in remaining data
485+
chunk = client.discard_until("\n", limit: 5)
486+
expect(chunk).not.to be_nil
487+
expect(chunk).to be(:end_with?, "\n")
488+
489+
# Use peek to verify final buffer state
490+
expect(client.peek).to be == "world\n"
491+
expect(client.read).to be == "world\n"
492+
end
493+
494+
it "handles patterns spanning buffer boundaries" do
495+
# Use a small block size to force the pattern to span boundaries
496+
client.block_size = 3
497+
498+
server.write("ab")
499+
server.flush
500+
server.write("cdef")
501+
server.close
502+
503+
# Pattern "cd" spans the boundary between "ab" and "cdef"
504+
chunk = client.discard_until("cd")
505+
expect(chunk).not.to be_nil
506+
expect(chunk).to be(:end_with?, "cd")
507+
expect(client.read).to be == "ef"
508+
end
509+
510+
it "handles large patterns efficiently" do
511+
large_pattern = "X" * 20 # Trigger sliding window logic
512+
server.write("some data before")
513+
server.write(large_pattern)
514+
server.write("some data after")
515+
server.close
516+
517+
chunk = client.discard_until(large_pattern)
518+
expect(chunk).not.to be_nil
519+
expect(chunk).to be(:end_with?, large_pattern)
520+
expect(client.read).to be == "some data after"
521+
end
522+
523+
with "with 1-byte block size" do
524+
it "can discard data with a multi-byte pattern" do
525+
server.write("hello\nworld\n")
526+
server.close
527+
528+
client.block_size = 1
529+
530+
chunk1 = client.discard_until("\n")
531+
expect(chunk1).not.to be_nil
532+
expect(chunk1).to be(:end_with?, "\n")
533+
534+
chunk2 = client.discard_until("\n")
535+
expect(chunk2).not.to be_nil
536+
expect(chunk2).to be(:end_with?, "\n")
537+
538+
expect(client.discard_until("\n")).to be_nil
539+
end
540+
end
541+
end
441542
end
442543

443544
ABidirectionalStream = Sus::Shared("a bidirectional stream") do

0 commit comments

Comments
 (0)