Skip to content

SegmentPool improvements #352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/api/kotlinx-io-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public abstract interface class kotlinx/io/RawSource : java/lang/AutoCloseable {
}

public final class kotlinx/io/Segment {
public synthetic fun <init> ([BIIZZLkotlin/jvm/internal/DefaultConstructorMarker;)V
public synthetic fun <init> ([BIILkotlinx/io/SegmentCopyTracker;ZLkotlin/jvm/internal/DefaultConstructorMarker;)V
public final synthetic fun dataAsByteArray (Z)[B
public final synthetic fun getLimit ()I
public final synthetic fun getNext ()Lkotlinx/io/Segment;
Expand Down
75 changes: 66 additions & 9 deletions core/common/src/Segment.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,47 @@ package kotlinx.io
import kotlin.jvm.JvmField
import kotlin.jvm.JvmSynthetic

/**
* Tracks shared segment copies.
*
* A new [SegmentCopyTracker] instance should be not shared by default (i.e. `shared == false`).
* Any further [addCopy] calls should move the tracker to a shared state (i.e. `shared == true`).
* Once a shared segment copy is recycled, [removeCopy] should be called.
* Depending on implementation, calling [removeCopy] the same number of times as [addCopy] may
* or may not transition the tracked back to unshared stated.
*
* The class is not intended for public use and currently designed to fit the only use case - within JVM SegmentPool
* implementation.
*/
internal abstract class SegmentCopyTracker {
/**
* `true` if a tracker shared by multiple segment copies.
*/
abstract val shared: Boolean

/**
* Track a new copy created by sharing an associated segment.
*/
abstract fun addCopy()

/**
* Records reclamation of a shared segment copy associated with this tracker.
* If a tracker was in unshared state, this call should not affect an internal state.
*
* @return `true` if the segment was not shared *before* this called.
*/
abstract fun removeCopy(): Boolean
}

/**
* Simple [SegmentCopyTracker] that always reports shared state.
*/
internal object AlwaysSharedCopyTracker : SegmentCopyTracker() {
override val shared: Boolean = true
override fun addCopy() = Unit
override fun removeCopy(): Boolean = true
}

/**
* A segment of a buffer.
*
Expand Down Expand Up @@ -59,8 +100,17 @@ public class Segment {
internal var limit: Int = 0

/** True if other segments or byte strings use the same byte array. */
@JvmField
internal var shared: Boolean = false
internal val shared: Boolean
get() = copyTracker?.shared ?: false

/**
* Tracks number shared copies
*
* Note that this reference is not `@Volatile` as segments are not thread-safe and it's an error
* to modify the same segment concurrently.
* At the same time, an object [copyTracker] refers to could be modified concurrently.
*/
internal var copyTracker: SegmentCopyTracker? = null

/** True if this segment owns the byte array and can append to it, extending `limit`. */
@JvmField
Expand All @@ -79,14 +129,14 @@ public class Segment {
private constructor() {
this.data = ByteArray(SIZE)
this.owner = true
this.shared = false
this.copyTracker = null
}

private constructor(data: ByteArray, pos: Int, limit: Int, shared: Boolean, owner: Boolean) {
private constructor(data: ByteArray, pos: Int, limit: Int, shareToken: SegmentCopyTracker?, owner: Boolean) {
this.data = data
this.pos = pos
this.limit = limit
this.shared = shared
this.copyTracker = shareToken
this.owner = owner
}

Expand All @@ -96,8 +146,10 @@ public class Segment {
* prevents it from being pooled.
*/
internal fun sharedCopy(): Segment {
shared = true
return Segment(data, pos, limit, true, false)
val t = copyTracker ?: SegmentPool.tracker().also {
copyTracker = it
}
return Segment(data, pos, limit, t.also { it.addCopy() }, false)
}

/**
Expand Down Expand Up @@ -250,8 +302,13 @@ public class Segment {
internal fun new(): Segment = Segment()

@JvmSynthetic
internal fun new(data: ByteArray, pos: Int, limit: Int, shared: Boolean, owner: Boolean): Segment
= Segment(data, pos, limit, shared, owner)
internal fun new(
data: ByteArray,
pos: Int,
limit: Int,
copyTracker: SegmentCopyTracker?,
owner: Boolean
): Segment = Segment(data, pos, limit, copyTracker, owner)
}
}

Expand Down
7 changes: 7 additions & 0 deletions core/common/src/SegmentPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,11 @@ internal expect object SegmentPool {

/** Recycle a segment that the caller no longer needs. */
fun recycle(segment: Segment)

/**
* Allocates a new copy tracker that'll be associated with a segment from this pool.
* For performance reasons, there's no tracker attached to a segment initially.
* Instead, it's allocated lazily on the first sharing attempt.
*/
fun tracker(): SegmentCopyTracker
}
3 changes: 2 additions & 1 deletion core/common/src/unsafe/UnsafeBufferOperations.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public object UnsafeBufferOperations {
public fun moveToTail(buffer: Buffer, bytes: ByteArray, startIndex: Int = 0, endIndex: Int = bytes.size) {
checkBounds(bytes.size, startIndex, endIndex)
val segment = Segment.new(
bytes, startIndex, endIndex, shared = true /* to prevent recycling */,
bytes, startIndex, endIndex,
AlwaysSharedCopyTracker, /* to prevent recycling */
owner = false /* can't append to it */
)
val tail = buffer.tail
Expand Down
26 changes: 26 additions & 0 deletions core/common/test/AlwaysSharedCopyTrackerTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2010-2024 JetBrains s.r.o. and Kotlin Programming Language contributors.
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE.txt file.
*/

package kotlinx.io

import kotlin.test.Test
import kotlin.test.assertTrue

class AlwaysSharedCopyTrackerTest {
@Test
fun stateTransition() {
val tracker = AlwaysSharedCopyTracker
assertTrue(tracker.shared)

assertTrue(tracker.removeCopy())
assertTrue(tracker.shared)

tracker.addCopy()
assertTrue(tracker.shared)

assertTrue(tracker.removeCopy())
assertTrue(tracker.shared)
}
}
2 changes: 2 additions & 0 deletions core/js/src/SegmentPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ internal actual object SegmentPool {

actual fun recycle(segment: Segment) {
}

actual fun tracker(): SegmentCopyTracker = AlwaysSharedCopyTracker
}
Loading