Skip to content

Commit

Permalink
fstream: improve make_file_input_stream() for a subrange of a file
Browse files Browse the repository at this point in the history
Currently, when Scylla wants to read a subset of a file, it does so
by playing games with buffer_size and hoping no read-ahead takes place.

This is hacky, however, so provide an explicit API that specifies both
the offset within the file and the size of the data to read.  This will
allow us to enable read-ahead without fear.

Message-Id: <1453891684-3982-1-git-send-email-avi@scylladb.com>
  • Loading branch information
avikivity committed Jan 28, 2016
1 parent fbd9b30 commit 6623379
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 10 deletions.
60 changes: 50 additions & 10 deletions core/fstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@ class file_data_source_impl : public data_source_impl {
file _file;
file_input_stream_options _options;
uint64_t _pos;
uint64_t _remain;
circular_buffer<future<temporary_buffer<char>>> _read_buffers;
unsigned _reads_in_progress = 0;
std::experimental::optional<promise<>> _done;
public:
file_data_source_impl(file f, uint64_t offset, file_input_stream_options options)
: _file(std::move(f)), _options(options), _pos(offset) {}
file_data_source_impl(file f, uint64_t offset, uint64_t len, file_input_stream_options options)
: _file(std::move(f)), _options(options), _pos(offset), _remain(len) {
// prevent wraparounds
_remain = std::min(std::numeric_limits<uint64_t>::max() - _pos, _remain);
}
virtual future<temporary_buffer<char>> get() override {
if (_read_buffers.empty()) {
issue_read_aheads(1);
Expand All @@ -63,33 +67,69 @@ class file_data_source_impl : public data_source_impl {
}
auto ra = std::max(min_ra, _options.read_ahead);
while (_read_buffers.size() < ra) {
if (!_remain) {
if (_read_buffers.size() >= min_ra) {
return;
}
_read_buffers.push_back(make_ready_future<temporary_buffer<char>>());
continue;
}
++_reads_in_progress;
// if _pos is not dma-aligned, we'll get a short read. Account for that.
auto now = _options.buffer_size - _pos % _file.disk_read_dma_alignment();
_read_buffers.push_back(_file.dma_read_bulk<char>(_pos, now, _options.io_priority_class).then_wrapped(
[this] (future<temporary_buffer<char>> ret) {
// Also avoid reading beyond _remain.
uint64_t align = _file.disk_read_dma_alignment();
auto start = align_down(_pos, align);
auto end = align_up(std::min(start + _options.buffer_size, _pos + _remain), align);
auto len = end - start;
_read_buffers.push_back(_file.dma_read_bulk<char>(start, len, _options.io_priority_class).then_wrapped(
[this, start, end, pos = _pos, remain = _remain] (future<temporary_buffer<char>> ret) {
issue_read_aheads();
--_reads_in_progress;
if (_done && !_reads_in_progress) {
_done->set_value();
}
return ret;
if ((pos == start && end <= pos + remain) || ret.failed()) {
// no games needed
return ret;
} else {
// first or last buffer, need trimming
auto tmp = ret.get0();
auto real_end = start + tmp.size();
if (real_end <= pos) {
return make_ready_future<temporary_buffer<char>>();
}
if (real_end > pos + remain) {
tmp.trim(pos + remain - start);
}
if (start < pos) {
tmp.trim_front(pos - start);
}
return make_ready_future<temporary_buffer<char>>(std::move(tmp));
}
}));
_pos += now;
auto old_pos = _pos;
_pos = end;
_remain = std::max(_pos, old_pos + _remain) - _pos;
};
}
};

class file_data_source : public data_source {
public:
file_data_source(file f, uint64_t offset, file_input_stream_options options)
file_data_source(file f, uint64_t offset, uint64_t len, file_input_stream_options options)
: data_source(std::make_unique<file_data_source_impl>(
std::move(f), offset, options)) {}
std::move(f), offset, len, options)) {}
};


input_stream<char> make_file_input_stream(
file f, uint64_t offset, uint64_t len, file_input_stream_options options) {
return input_stream<char>(file_data_source(std::move(f), offset, len, std::move(options)));
}

input_stream<char> make_file_input_stream(
file f, uint64_t offset, file_input_stream_options options) {
return input_stream<char>(file_data_source(std::move(f), offset, std::move(options)));
return make_file_input_stream(std::move(f), offset, std::numeric_limits<uint64_t>::max(), std::move(options));
}

input_stream<char> make_file_input_stream(
Expand Down
14 changes: 14 additions & 0 deletions core/fstream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

#pragma once

/// \file

// File <-> streams adapters
//
// Seastar files are block-based due to the reliance on DMA - you must read
Expand All @@ -40,6 +42,18 @@ struct file_input_stream_options {
::io_priority_class io_priority_class = default_priority_class();
};

/// \brief Creates an input_stream to read a portion of a file.
///
/// \param file File to read; multiple streams for the same file may coexist
/// \param offset Starting offset to read from (no alignment restrictions)
/// \param len Maximum number of bytes to read; the stream will stop at end-of-file
/// even if `offset + len` is beyond end-of-file.
/// \param options A set of options controlling the stream.
///
/// \note Multiple input streams may exist concurrently for the same file.
input_stream<char> make_file_input_stream(
file file, uint64_t offset, uint64_t len, file_input_stream_options options = {});

// Create an input_stream for a given file, with the specified options.
// Multiple fibers of execution (continuations) may safely open
// multiple input streams concurrently for the same file.
Expand Down
62 changes: 62 additions & 0 deletions tests/fstream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
#include "core/do_with.hh"
#include "core/seastar.hh"
#include "test-utils.hh"
#include "core/thread.hh"
#include <random>
#include <boost/range/adaptor/transformed.hpp>

struct writer {
output_stream<char> out;
Expand Down Expand Up @@ -189,3 +192,62 @@ SEASTAR_TEST_CASE(test_consume_unaligned_file_large) {
return test_consume_until_end((1 << 20) + 1);
}

SEASTAR_TEST_CASE(test_input_stream_esp_around_eof) {
return seastar::async([] {
auto flen = uint64_t(5341);
auto rdist = std::uniform_int_distribution<char>();
auto reng = std::default_random_engine();
auto data = boost::copy_range<std::vector<uint8_t>>(
boost::irange<uint64_t>(0, flen)
| boost::adaptors::transformed([&] (int x) { return rdist(reng); }));
auto f = open_file_dma("file.tmp",
open_flags::rw | open_flags::create | open_flags::truncate).get0();
auto out = make_file_output_stream(f);
out.write(reinterpret_cast<const char*>(data.data()), data.size()).get();
out.flush().get();
//out.close().get(); // FIXME: closes underlying stream:?!
struct range { uint64_t start; uint64_t end; };
auto ranges = std::vector<range>{{
range{0, flen},
range{0, flen * 2},
range{0, flen + 1},
range{0, flen - 1},
range{0, 1},
range{1, 2},
range{flen - 1, flen},
range{flen - 1, flen + 1},
range{flen, flen + 1},
range{flen + 1, flen + 2},
range{1023, flen-1},
range{1023, flen},
range{1023, flen + 2},
range{8193, 8194},
range{1023, 1025},
range{1023, 1024},
range{1024, 1025},
range{1023, 4097},
}};
auto opt = file_input_stream_options();
opt.buffer_size = 512;
for (auto&& r : ranges) {
auto start = r.start;
auto end = r.end;
auto len = end - start;
auto in = make_file_input_stream(f, start, len, opt);
std::vector<uint8_t> readback;
auto more = true;
while (more) {
auto rdata = in.read().get0();
for (size_t i = 0; i < rdata.size(); ++i) {
readback.push_back(rdata.get()[i]);
}
more = !rdata.empty();
}
//in.close().get();
auto xlen = std::min(end, flen) - std::min(flen, start);
BOOST_REQUIRE_EQUAL(xlen, readback.size());
BOOST_REQUIRE(std::equal(readback.begin(), readback.end(), data.begin() + std::min(start, flen)));
}
f.close().get();
});
}

0 comments on commit 6623379

Please sign in to comment.