Skip to content

Commit

Permalink
Fix HashFunction impl
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg committed Oct 9, 2024
1 parent 02a1ee2 commit 84fcbae
Showing 1 changed file with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,44 +29,51 @@ public interface UpdateFunction : AutoCloseable {
}
}

// TODO: add tests for this changes
@OptIn(UnsafeIoApi::class)
private class UpdatingSource(
private val source: RawSource,
private val function: UpdateFunction,
) : RawSource {
override fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
val result = source.readAtMostTo(sink, byteCount)
if (result != -1L) {
@OptIn(UnsafeIoApi::class)
UnsafeBufferOperations.iterate(sink, sink.size - result) { context, head, _ ->
val bytesRead = source.readAtMostTo(sink, byteCount)
if (bytesRead != -1L) {
val sinkOffset = sink.size - bytesRead
UnsafeBufferOperations.iterate(sink, sinkOffset) { context, head, headOffset ->
var segment = head
// needed only for head segment
var additionalOffset = (sinkOffset - headOffset).toInt()
while (segment != null) {
context.withData(segment, function::update)
context.withData(segment) { data, startIndex, endIndex ->
function.update(data, startIndex + additionalOffset, endIndex)
}
segment = context.next(segment)
additionalOffset = 0
}
}
}
return result
return bytesRead
}

override fun close(): Unit = source.close()
}

@OptIn(UnsafeIoApi::class)
private class UpdatingSink(
private val sink: RawSink,
private val function: UpdateFunction,
) : RawSink {
override fun write(source: Buffer, byteCount: Long) {
source.require(byteCount)

@OptIn(UnsafeIoApi::class)
UnsafeBufferOperations.iterate(source) { context, head ->
var consumedCount = 0L
var remaining = byteCount
var segment = head
while (segment != null && consumedCount < byteCount) {
while (segment != null && remaining > 0) {
context.withData(segment) { bytes, startIndex, endIndex ->
val toUpdate = minOf(byteCount - consumedCount, (endIndex - startIndex).toLong()).toInt()
val toUpdate = minOf(remaining, (endIndex - startIndex).toLong()).toInt()
function.update(bytes, startIndex, startIndex + toUpdate)
consumedCount += toUpdate
remaining -= toUpdate
}
segment = context.next(segment)
}
Expand Down

0 comments on commit 84fcbae

Please sign in to comment.