Skip to content

сleanups of close/release inconsistency #182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from Aug 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
val server = RSocketServer().bindIn(CoroutineScope(benchJob + Dispatchers.Unconfined), LocalServerTransport()) {
RSocketRequestHandler {
requestResponse {
it.release()
it.close()
payloadCopy()
}
requestStream {
it.release()
it.close()
payloadsFlow
}
requestChannel { init, payloads ->
init.release()
init.close()
payloads.flowOn(requestStrategy)
}
}
Expand All @@ -74,7 +74,7 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
)

override fun releasePayload(payload: Payload) {
payload.release()
payload.close()
}

override suspend fun doRequestResponse(): Payload = client.requestResponse(payloadCopy())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ actual class ChatApi(private val rSocket: RSocket, private val proto: ProtoBuf)
actual suspend fun delete(id: Int) {
rSocket.requestResponse(
proto.encodeToPayload(route = "chats.delete", DeleteChat(id))
).release()
).close()
}
}
10 changes: 5 additions & 5 deletions rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,27 @@ import kotlinx.coroutines.flow.*
public interface RSocket : CoroutineScope {

public suspend fun metadataPush(metadata: ByteReadPacket) {
metadata.release()
metadata.close()
notImplemented("Metadata Push")
}

public suspend fun fireAndForget(payload: Payload) {
payload.release()
payload.close()
notImplemented("Fire and Forget")
}

public suspend fun requestResponse(payload: Payload): Payload {
payload.release()
payload.close()
notImplemented("Request Response")
}

public fun requestStream(payload: Payload): Flow<Payload> {
payload.release()
payload.close()
notImplemented("Request Stream")
}

public fun requestChannel(initPayload: Payload, payloads: Flow<Payload>): Flow<Payload> {
initPayload.release()
initPayload.close()
notImplemented("Request Channel")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public class RSocketConnector internal constructor(
connection.sendFrame(setupFrame)
return requester
} catch (cause: Throwable) {
connectionConfig.setupPayload.release()
setupFrame.release()
connectionConfig.setupPayload.close()
setupFrame.close()
connection.cancel("Connection establishment failed", cause)
throw cause
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public class RSocketConnectorBuilder internal constructor() {

private companion object {
private val defaultAcceptor: ConnectionAcceptor = ConnectionAcceptor {
config.setupPayload.release()
config.setupPayload.close()
EmptyRSocket()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ internal class CancelFrame(
override val type: FrameType get() = FrameType.Cancel
override val flags: Int get() = 0

override fun release(): Unit = Unit
override fun close(): Unit = Unit

override fun BytePacketBuilder.writeSelf(): Unit = Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal class ErrorFrame(
override val flags: Int get() = 0
val errorCode get() = (throwable as? RSocketError)?.errorCode ?: ErrorCode.ApplicationError

override fun release(): Unit = Unit
override fun close(): Unit = Unit

override fun BytePacketBuilder.writeSelf() {
writeInt(errorCode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ internal class ExtensionFrame(
override val type: FrameType get() = FrameType.Extension
override val flags: Int get() = if (payload.metadata != null) Flags.Metadata else 0

override fun release() {
payload.release()
override fun close() {
payload.close()
}

override fun BytePacketBuilder.writeSelf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ public sealed class Frame : Closeable {
public abstract val streamId: Int
public abstract val flags: Int

internal abstract fun release()

protected abstract fun BytePacketBuilder.writeSelf()
protected abstract fun StringBuilder.appendFlags()
protected abstract fun StringBuilder.appendSelf()
Expand All @@ -54,10 +52,6 @@ public sealed class Frame : Closeable {
append(flag)
if (value) append(1) else append(0)
}

override fun close() {
release()
}
}

internal fun ByteReadPacket.readFrame(pool: ObjectPool<ChunkBuffer>): Frame = use {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ internal class KeepAliveFrame(
override val streamId: Int get() = 0
override val flags: Int get() = if (respond) RespondFlag else 0

override fun release() {
data.release()
override fun close() {
data.close()
}

override fun BytePacketBuilder.writeSelf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ internal class LeaseFrame(
override val streamId: Int get() = 0
override val flags: Int get() = if (metadata != null) Flags.Metadata else 0

override fun release() {
metadata?.release()
override fun close() {
metadata?.close()
}

override fun BytePacketBuilder.writeSelf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ internal class MetadataPushFrame(
override val streamId: Int get() = 0
override val flags: Int get() = Flags.Metadata

override fun release() {
metadata.release()
override fun close() {
metadata.close()
}

override fun BytePacketBuilder.writeSelf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ internal class RequestFrame(
return flags
}

override fun release() {
payload.release()
override fun close() {
payload.close()
}

override fun BytePacketBuilder.writeSelf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ internal class RequestNFrame(
override val type: FrameType get() = FrameType.RequestN
override val flags: Int get() = 0

override fun release(): Unit = Unit
override fun close(): Unit = Unit

override fun BytePacketBuilder.writeSelf() {
writeInt(requestN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ internal class ResumeFrame(
override val streamId: Int get() = 0
override val flags: Int get() = 0

override fun release(): Unit = Unit
override fun close(): Unit = Unit

override fun BytePacketBuilder.writeSelf() {
writeVersion(version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ internal class ResumeOkFrame(
override val streamId: Int get() = 0
override val flags: Int get() = 0

override fun release(): Unit = Unit
override fun close(): Unit = Unit

override fun BytePacketBuilder.writeSelf() {
writeLong(lastReceivedClientPosition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ internal class SetupFrame(
return flags
}

override fun release() {
resumeToken?.release()
payload.release()
override fun close() {
resumeToken?.close()
payload.close()
}

override fun BytePacketBuilder.writeSelf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal inline fun buildPacket(pool: ObjectPool<ChunkBuffer>, block: BytePacket
block(builder)
return builder.build()
} catch (t: Throwable) {
builder.release()
builder.close()
throw t
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ internal suspend inline fun connect(
requestJob.invokeOnCompletion {
prioritizer.close(it)
streamsStorage.cleanup(it)
connectionConfig.setupPayload.release()
connectionConfig.setupPayload.close()
}

val requester = interceptors.wrapRequester(
Expand Down Expand Up @@ -76,8 +76,8 @@ internal suspend inline fun connect(
is MetadataPushFrame -> responder.handleMetadataPush(frame.metadata)
is ErrorFrame -> connection.cancel("Error frame received on 0 stream", frame.throwable)
is KeepAliveFrame -> keepAliveHandler.mark(frame)
is LeaseFrame -> frame.release().also { error("lease isn't implemented") }
else -> frame.release()
is LeaseFrame -> frame.close().also { error("lease isn't implemented") }
else -> frame.close()
}
else -> streamsStorage.handleFrame(frame, responder)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ internal class RSocketRequester(
try {
sender.sendRequestPayload(FrameType.RequestFnF, id, payload)
} catch (cause: Throwable) {
payload.release()
payload.close()
if (isActive) sender.sendCancel(id) //if cancelled during fragmentation
throw cause
}
Expand Down Expand Up @@ -128,7 +128,7 @@ internal class RSocketRequester(
onReceiveComplete()
return result
} catch (cause: Throwable) {
payload.release()
payload.close()
val isCancelled = onReceiveCancelled(cause)
if (isActive && isCancelled) sender.sendCancel(id)
throw cause
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ internal class RSocketResponder(
if (currentCoroutineContext().isActive && isFailed) sender.sendError(id, cause)
throw cause
} finally {
payload.release()
payload.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ internal class StreamsStorage(private val isServer: Boolean, private val pool: O
}

fun remove(id: Int): FrameHandler? {
return handlers.remove(id)?.also(FrameHandler::release)
return handlers.remove(id)?.also(FrameHandler::close)
}

fun contains(id: Int): Boolean {
Expand All @@ -44,7 +44,7 @@ internal class StreamsStorage(private val isServer: Boolean, private val pool: O
handlers.clear()
values.forEach {
it.cleanup(error)
it.release()
it.close()
}
}

Expand All @@ -55,8 +55,8 @@ internal class StreamsStorage(private val isServer: Boolean, private val pool: O
is CancelFrame -> handlers[id]?.handleCancel()
is ErrorFrame -> handlers[id]?.handleError(frame.throwable)
is RequestFrame -> when {
frame.type == FrameType.Payload -> handlers[id]?.handleRequest(frame) ?: frame.release() // release on unknown stream id
isServer.xor(id % 2 != 0) -> frame.release() // request frame on wrong stream id
frame.type == FrameType.Payload -> handlers[id]?.handleRequest(frame) ?: frame.close() // release on unknown stream id
isServer.xor(id % 2 != 0) -> frame.close() // request frame on wrong stream id
else -> {
val initialRequest = frame.initialRequest
val handler = when (frame.type) {
Expand All @@ -70,7 +70,7 @@ internal class StreamsStorage(private val isServer: Boolean, private val pool: O
handler.handleRequest(frame)
}
}
else -> frame.release()
else -> frame.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.*

internal abstract class FrameHandler(pool: ObjectPool<ChunkBuffer>) {
internal abstract class FrameHandler(pool: ObjectPool<ChunkBuffer>) : Closeable {
private val data = BytePacketBuilder(0, pool)
private val metadata = BytePacketBuilder(0, pool)
protected abstract var hasMetadata: Boolean
Expand Down Expand Up @@ -57,9 +57,9 @@ internal abstract class FrameHandler(pool: ObjectPool<ChunkBuffer>) {

abstract fun cleanup(cause: Throwable?)

fun release() {
data.release()
metadata.release()
override fun close() {
data.close()
metadata.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public fun CompositeMetadata(entries: List<Metadata>): CompositeMetadata =
DefaultCompositeMetadata(entries.map(CompositeMetadata::Entry))

@ExperimentalMetadataApi
public interface CompositeMetadata : Metadata {
public sealed interface CompositeMetadata : Metadata {
public val entries: List<Entry>
override val mimeType: MimeType get() = Reader.mimeType

Expand All @@ -44,6 +44,10 @@ public interface CompositeMetadata : Metadata {
}
}

override fun close() {
entries.forEach { it.content.close() }
}

public class Entry(public val mimeType: MimeType, public val content: ByteReadPacket) {
public constructor(metadata: Metadata) : this(metadata.mimeType, metadata.toPacket())
}
Expand Down
Loading