Skip to content

[PR 1/5] Organize Buffer's segments as a regular list #332

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions core/apple/src/AppleCore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ private open class OutputStreamSink(
source.size -= bytesWritten

if (head.pos == head.limit) {
source.head = head.pop()
SegmentPool.recycle(head)
source.recycleHead()
}
}
}
Expand Down Expand Up @@ -101,8 +100,7 @@ private open class NSInputStreamSource(
if (bytesRead == 0L) {
if (tail.pos == tail.limit) {
// We allocated a tail segment, but didn't end up needing it. Recycle!
sink.head = tail.pop()
SegmentPool.recycle(tail)
sink.recycleTail()
}
return -1
}
Expand Down
9 changes: 5 additions & 4 deletions core/apple/src/BuffersApple.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
package kotlinx.io

import kotlinx.cinterop.*
import platform.Foundation.*
import platform.Foundation.NSData
import platform.Foundation.create
import platform.Foundation.data
import platform.darwin.NSUIntegerMax
import platform.posix.*

Expand Down Expand Up @@ -44,8 +46,7 @@ internal fun Buffer.readAtMostTo(sink: CPointer<uint8_tVar>, maxLength: Int): In
size -= toCopy.toLong()

if (s.pos == s.limit) {
head = s.pop()
SegmentPool.recycle(s)
recycleHead()
}

return toCopy
Expand All @@ -70,6 +71,6 @@ internal fun Buffer.snapshotAsNSData(): NSData {
}
curr = curr.next
index += length
} while (curr !== head)
} while (curr != null)
return NSData.create(bytesNoCopy = bytes, length = size.convert())
}
144 changes: 97 additions & 47 deletions core/common/src/Buffer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public class Buffer : Source, Sink {
@JvmField
internal var head: Segment? = null

@JvmField
internal var tail: Segment? = null

/**
* The number of bytes accessible for read from this buffer.
*/
Expand Down Expand Up @@ -76,8 +79,7 @@ public class Buffer : Source, Sink {
val b = data[pos++]
size -= 1L
if (pos == limit) {
head = segment.pop()
SegmentPool.recycle(segment)
recycleHead()
} else {
segment.pos = pos
}
Expand All @@ -102,8 +104,7 @@ public class Buffer : Source, Sink {
size -= 2L

if (pos == limit) {
head = segment.pop()
SegmentPool.recycle(segment)
recycleHead()
} else {
segment.pos = pos
}
Expand Down Expand Up @@ -138,8 +139,7 @@ public class Buffer : Source, Sink {
size -= 4L

if (pos == limit) {
head = segment.pop()
SegmentPool.recycle(segment)
recycleHead()
} else {
segment.pos = pos
}
Expand Down Expand Up @@ -176,8 +176,7 @@ public class Buffer : Source, Sink {
size -= 8L

if (pos == limit) {
head = segment.pop()
SegmentPool.recycle(segment)
recycleHead()
} else {
segment.pos = pos
}
Expand Down Expand Up @@ -241,13 +240,7 @@ public class Buffer : Source, Sink {
val copy = s!!.sharedCopy()
copy.pos += currentOffset.toInt()
copy.limit = minOf(copy.pos + remainingByteCount.toInt(), copy.limit)
if (out.head == null) {
copy.prev = copy
copy.next = copy.prev
out.head = copy.next
} else {
out.head!!.prev!!.push(copy)
}
out.pushSegment(copy)
remainingByteCount -= (copy.limit - copy.pos).toLong()
currentOffset = 0L
s = s.next
Expand All @@ -264,7 +257,7 @@ public class Buffer : Source, Sink {
if (result == 0L) return 0L

// Omit the tail if it's still writable.
val tail = head!!.prev!!
val tail = tail!!
if (tail.limit < Segment.SIZE && tail.owner) {
result -= (tail.limit - tail.pos).toLong()
}
Expand Down Expand Up @@ -317,8 +310,7 @@ public class Buffer : Source, Sink {
head.pos += toSkip

if (head.pos == head.limit) {
this.head = head.pop()
SegmentPool.recycle(head)
recycleHead()
}
}
}
Expand All @@ -336,8 +328,7 @@ public class Buffer : Source, Sink {
size -= toCopy.toLong()

if (s.pos == s.limit) {
head = s.pop()
SegmentPool.recycle(s)
recycleHead()
}

return toCopy
Expand Down Expand Up @@ -377,19 +368,20 @@ public class Buffer : Source, Sink {
internal fun writableSegment(minimumCapacity: Int): Segment {
require(minimumCapacity >= 1 && minimumCapacity <= Segment.SIZE) { "unexpected capacity" }

if (head == null) {
if (tail == null) {
val result = SegmentPool.take() // Acquire a first segment.
head = result
result.prev = result
result.next = result
tail = result
return result
}

var tail = head!!.prev
if (tail!!.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
tail = tail.push(SegmentPool.take()) // Append a new empty segment to fill up.
val t = tail!!
if (t.limit + minimumCapacity > Segment.SIZE || !t.owner) {
val newTail = t.push(SegmentPool.take()) // Append a new empty segment to fill up.
tail = newTail
return newTail
}
return tail
return t
}

override fun write(source: ByteArray, startIndex: Int, endIndex: Int) {
Expand Down Expand Up @@ -486,7 +478,7 @@ public class Buffer : Source, Sink {
while (remainingByteCount > 0L) {
// Is a prefix of the source's head segment all that we need to move?
if (remainingByteCount < source.head!!.limit - source.head!!.pos) {
val tail = if (head != null) head!!.prev else null
val tail = tail
if (tail != null && tail.owner &&
remainingByteCount + tail.limit - (if (tail.shared) 0 else tail.pos) <= Segment.SIZE
) {
Expand All @@ -498,23 +490,22 @@ public class Buffer : Source, Sink {
} else {
// We're going to need another segment. Split the source's head
// segment in two, then move the first of those two to this buffer.
source.head = source.head!!.split(remainingByteCount.toInt())
val newHead = source.head!!.split(remainingByteCount.toInt())
if (source.head == source.tail) {
source.tail = newHead
}
source.head = newHead
}
}

// Remove the source's head segment and append it to our tail.
val segmentToMove = source.head
val movedByteCount = (segmentToMove!!.limit - segmentToMove.pos).toLong()
source.head = segmentToMove.pop()
if (head == null) {
head = segmentToMove
segmentToMove.prev = segmentToMove
segmentToMove.next = segmentToMove.prev
} else {
var tail = head!!.prev
tail = tail!!.push(segmentToMove)
tail.compact()
if (source.head == null) {
source.tail = null
}
pushSegment(segmentToMove, true)
source.size -= movedByteCount
size += movedByteCount
remainingByteCount -= movedByteCount
Expand Down Expand Up @@ -582,16 +573,15 @@ public class Buffer : Source, Sink {
val result = Buffer()
if (size == 0L) return result

val head = head!!
val head = this.head!!
val headCopy = head.sharedCopy()

result.head = headCopy
headCopy.prev = result.head
headCopy.next = headCopy.prev
result.tail = headCopy

var s = head.next
while (s !== head) {
headCopy.prev!!.push(s!!.sharedCopy())
while (s != null) {
result.tail = result.tail!!.push(s.sharedCopy())
s = s.next
}

Expand Down Expand Up @@ -642,6 +632,63 @@ public class Buffer : Source, Sink {

return "Buffer(size=$size hex=$builder)"
}

/**
* Unlinks and recycles this buffer's head.
*
* If head had a successor, it'll become a new head.
* Otherwise, both [head] and [tail] will be set to null.
*
* It's up to a caller to ensure that the head exists.
*/
internal fun recycleHead() {
val oldHead = head!!
val nextHead = oldHead.next
head = nextHead
if (nextHead == null) {
tail = null
} else {
nextHead.prev = null
}
oldHead.next = null
SegmentPool.recycle(oldHead)
}

/**
* Unlinks and recycles this buffer's tail segment.
*
* If tail had a predecessor, it'll become a new tail.
* Otherwise, both [head] and [tail] will be set to null.
*
* It's up to a caller to ensure that the tail exists.
*/
internal fun recycleTail() {
val oldTail = tail!!
val newTail = oldTail.prev
tail = newTail
if (newTail == null) {
head = null
} else {
newTail.next = null
}
oldTail.prev = null
SegmentPool.recycle(oldTail)
}

@Suppress("NOTHING_TO_INLINE")
private inline fun pushSegment(newTail: Segment, tryCompact: Boolean = false) {
if (head == null) {
head = newTail
tail = newTail
} else if (tryCompact) {
tail = tail!!.push(newTail).compact()
if (tail!!.prev == null) {
head = tail
}
} else {
tail = tail!!.push(newTail)
}
}
}

/**
Expand All @@ -652,23 +699,26 @@ internal inline fun <T> Buffer.seek(
fromIndex: Long,
lambda: (Segment?, Long) -> T
): T {
var s: Segment = head ?: return lambda(null, -1L)
if (this.head == null) lambda(null, -1L)

if (size - fromIndex < fromIndex) {
var s = tail
// We're scanning in the back half of this buffer. Find the segment starting at the back.
var offset = size
while (offset > fromIndex) {
s = s.prev!!
while (s != null && offset > fromIndex) {
offset -= (s.limit - s.pos).toLong()
if (offset <= fromIndex) break
s = s.prev
}
return lambda(s, offset)
} else {
var s = this.head
// We're scanning in the front half of this buffer. Find the segment starting at the front.
var offset = 0L
while (true) {
while (s != null) {
val nextOffset = offset + (s.limit - s.pos)
if (nextOffset > fromIndex) break
s = s.next!!
s = s.next
offset = nextOffset
}
return lambda(s, offset)
Expand Down
9 changes: 5 additions & 4 deletions core/common/src/Buffers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public fun Buffer.snapshot(): ByteString {
check(curr != null) { "Current segment is null" }
append(curr.data, curr.pos, curr.limit)
curr = curr.next
} while (curr !== head)
} while (curr != null)
}
}

Expand Down Expand Up @@ -53,10 +53,11 @@ public fun Buffer.indexOf(byte: Byte, startIndex: Long = 0, endIndex: Long = siz
if (o == -1L) {
return -1L
}
var segment = seg!!
var segment: Segment? = seg!!
var offset = o
do {
check(endOffset > offset)
segment!!
val idx = segment.indexOf(
byte,
// If start index within this segment, the diff will be positive and
Expand All @@ -71,8 +72,8 @@ public fun Buffer.indexOf(byte: Byte, startIndex: Long = 0, endIndex: Long = siz
return offset + idx.toLong()
}
offset += segment.size
segment = segment.next!!
} while (segment !== head && offset < endOffset)
segment = segment.next
} while (segment != null && offset < endOffset)
return -1L
}
}
9 changes: 5 additions & 4 deletions core/common/src/ByteStrings.kt
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,10 @@ public fun Buffer.indexOf(byteString: ByteString, startIndex: Long = 0): Long {
if (o == -1L) {
return -1L
}
var segment = seg!!
var segment: Segment? = seg
var offset = o
do {
segment!!
// If start index within this segment, the diff will be positive and
// we'll scan the segment starting from the corresponding offset.
// Otherwise, the diff will be negative and we'll scan the segment from the beginning.
Expand All @@ -147,16 +148,16 @@ public fun Buffer.indexOf(byteString: ByteString, startIndex: Long = 0): Long {
val firstOutboundOffset = maxOf(startOffset, segment.size - byteStringData.size + 1)
// Try to find a pattern in all suffixes shorter than the pattern. These suffixes start
// in the current segment, but ends in the following segments; thus we're using outbound function.
val idx1 = segment.indexOfBytesOutbound(byteStringData, firstOutboundOffset, head)
val idx1 = segment.indexOfBytesOutbound(byteStringData, firstOutboundOffset)
if (idx1 != -1) {
// Offset corresponds to the segment's start, idx - to offset within the segment.
return offset + idx1.toLong()
}

// We scanned the whole segment, so let's go to the next one
offset += segment.size
segment = segment.next!!
} while (segment !== head && offset + byteString.size <= size)
segment = segment.next
} while (segment != null && offset + byteString.size <= size)
return -1L
}
}
Expand Down
Loading