Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ByteChannelContentSource #11910

Merged
merged 14 commits into from
Jun 24, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import org.eclipse.jetty.util.BufferUtil;

Expand Down Expand Up @@ -92,6 +93,40 @@ public void clear()
}
}

/**
* A ByteBufferPool for a specific size and type of buffer
*/
class Sized extends Wrapper
gregw marked this conversation as resolved.
Show resolved Hide resolved
{
private final boolean _direct;
private final int _size;

public Sized(ByteBufferPool wrapped, boolean direct, int size)
{
super(Objects.requireNonNullElse(wrapped, NON_POOLING));
_direct = direct;
_size = size;
}

public boolean isDirect()
{
return _direct;
}

public int getSize()
{
return _size;
}

/**
* @return A {@link RetainableByteBuffer} suitable for the specified size and type.
*/
public RetainableByteBuffer acquire()
{
return getWrapped().acquire(_size, _direct);
}
}

/**
* <p>A {@link ByteBufferPool} that does not pool its
* {@link RetainableByteBuffer}s.</p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.io.content;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SeekableByteChannel;
import java.util.Objects;

import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.SerializedInvoker;

/**
* <p>A {@link Content.Source} backed by a {@link ByteChannel}.
* Any calls to {@link #demand(Runnable)} are immediately satisfied.</p>
*/
public class ByteChannelContentSource implements Content.Source
gregw marked this conversation as resolved.
Show resolved Hide resolved
{
private final AutoLock lock = new AutoLock();
private final SerializedInvoker _invoker = new SerializedInvoker();
private final ByteBufferPool.Sized _byteBufferPool;
private final ByteChannel _byteChannel;
private final long _offset;
private final long _length;
private RetainableByteBuffer _buffer;
private long _totalRead;
private Runnable demandCallback;
private Content.Chunk _terminal;

public ByteChannelContentSource(SeekableByteChannel seekableByteChannel, long offset, long length) throws IOException
{
this(null, false, 4096, seekableByteChannel, offset, length);
}

public ByteChannelContentSource(ByteBufferPool byteBufferPool, boolean direct, int bufferSize, SeekableByteChannel seekableByteChannel, long offset, long length) throws IOException
{
this(new ByteBufferPool.Sized(byteBufferPool, direct, bufferSize), seekableByteChannel, offset, length);
}

public ByteChannelContentSource(ByteBufferPool.Sized byteBufferPool, SeekableByteChannel seekableByteChannel, long offset, long length) throws IOException
{
this(byteBufferPool, (ByteChannel)seekableByteChannel, offset, length);
if (offset > 0)
seekableByteChannel.position(offset);
}

public ByteChannelContentSource(ByteChannel byteChannel)
{
this(new ByteBufferPool.Sized(null, false, 4096), byteChannel, -1L, -1L);
}

public ByteChannelContentSource(ByteBufferPool byteBufferPool, boolean direct, int bufferSize, ByteChannel byteChannel)
{
this(new ByteBufferPool.Sized(byteBufferPool, direct, bufferSize), byteChannel, -1L, -1L);
}

public ByteChannelContentSource(ByteBufferPool.Sized byteBufferPool, int bufferSize, ByteChannel byteChannel)
{
this(byteBufferPool, byteChannel, -1L, -1L);
}

private ByteChannelContentSource(ByteBufferPool.Sized byteBufferPool, ByteChannel byteChannel, long offset, long length)
{
_byteBufferPool = Objects.requireNonNull(byteBufferPool);
_byteChannel = byteChannel;
_offset = offset;
_length = length;
}

@Override
public void demand(Runnable demandCallback)
{
try (AutoLock ignored = lock.lock())
{
if (this.demandCallback != null)
throw new IllegalStateException("demand pending");
this.demandCallback = demandCallback;
}
_invoker.run(this::invokeDemandCallback);
}

private void invokeDemandCallback()
{
Runnable demandCallback;
try (AutoLock ignored = lock.lock())
{
demandCallback = this.demandCallback;
this.demandCallback = null;
}
if (demandCallback != null)
ExceptionUtil.run(demandCallback, this::fail);
}

@Override
public Content.Chunk read()
{
try (AutoLock ignored = lock.lock())
{
if (_terminal != null)
return _terminal;

if (!_byteChannel.isOpen())
{
if (_buffer != null)
{
_buffer.release();
_buffer = null;
}
_terminal = Content.Chunk.from(new ClosedChannelException(), true);
return _terminal;
}

if (_buffer == null)
{
_buffer = _byteBufferPool.acquire();
}
else if (_buffer.isRetained())
gregw marked this conversation as resolved.
Show resolved Hide resolved
{
_buffer.release();
_buffer = _byteBufferPool.acquire();
}

try
{
ByteBuffer byteBuffer = _buffer.getByteBuffer();
BufferUtil.clearToFill(byteBuffer);
if (_length >= 0)
byteBuffer.limit((int)Math.min(_buffer.capacity(), _length - _totalRead));
int read = _byteChannel.read(byteBuffer);
BufferUtil.flipToFlush(byteBuffer, 0);
if (read == 0)
return null;
if (read > 0)
{
_buffer.retain();
_totalRead += read;
if (_length >= 0 && _totalRead < _length)
return Content.Chunk.asChunk(byteBuffer, false, _buffer);

_terminal = Content.Chunk.EOF;
IO.close(_byteChannel);
Content.Chunk last = Content.Chunk.asChunk(byteBuffer, true, _buffer);
_buffer = null;
return last;
}
_terminal = Content.Chunk.EOF;
IO.close(_byteChannel);
}
catch (Throwable t)
{
_terminal = Content.Chunk.from(t, true);
gregw marked this conversation as resolved.
Show resolved Hide resolved
}
}
return _terminal;
}

@Override
public void fail(Throwable failure)
{
try (AutoLock ignored = lock.lock())
{
if (_terminal == null)
_terminal = Content.Chunk.from(failure, true);
else
ExceptionUtil.addSuppressedIfNotAssociated(_terminal.getFailure(), failure);
IO.close(_byteChannel);
if (_buffer != null)
{
_buffer.release();
_buffer = null;
}
}
}

@Override
public long getLength()
{
return _length;
}

@Override
public boolean rewind()
{
try (AutoLock ignored = lock.lock())
{
if (_offset >= 0 && _byteChannel instanceof SeekableByteChannel seekableByteChannel)
{
try
{
seekableByteChannel.position(_offset);
_totalRead = 0;
return true;
}
catch (IOException e)
{
// ignore;
}
}
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Objects;

import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.SerializedInvoker;

Expand Down Expand Up @@ -111,19 +112,7 @@ private void invokeDemandCallback()
this.demandCallback = null;
}
if (demandCallback != null)
runDemandCallback(demandCallback);
}

private void runDemandCallback(Runnable demandCallback)
{
try
{
demandCallback.run();
}
catch (Throwable x)
{
fail(x);
}
ExceptionUtil.run(demandCallback, this::fail);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Objects;

import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.thread.SerializedInvoker;

/**
Expand Down Expand Up @@ -114,19 +115,7 @@ private void invokeDemandCallback()
Runnable demandCallback = this.demandCallback;
this.demandCallback = null;
if (demandCallback != null)
runDemandCallback(demandCallback);
}

private void runDemandCallback(Runnable demandCallback)
{
try
{
demandCallback.run();
}
catch (Throwable x)
{
fail(x);
}
ExceptionUtil.run(demandCallback, this::fail);
}

private Content.Chunk process(Content.Chunk rawChunk)
Expand Down
Loading
Loading