forked from facebook/rocksdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathline_file_reader.cc
73 lines (66 loc) · 2.3 KB
/
line_file_reader.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "file/line_file_reader.h"
#include <cstring>
#include "monitoring/iostats_context_imp.h"
namespace ROCKSDB_NAMESPACE {
IOStatus LineFileReader::Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<LineFileReader>* reader,
IODebugContext* dbg,
RateLimiter* rate_limiter) {
std::unique_ptr<FSSequentialFile> file;
IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
if (io_s.ok()) {
reader->reset(new LineFileReader(
std::move(file), fname, nullptr,
std::vector<std::shared_ptr<EventListener>>{}, rate_limiter));
}
return io_s;
}
bool LineFileReader::ReadLine(std::string* out,
Env::IOPriority rate_limiter_priority) {
assert(out);
if (!io_status_.ok()) {
// Status should be checked (or permit unchecked) any time we return false.
io_status_.MustCheck();
return false;
}
out->clear();
for (;;) {
// Look for line delimiter
const char* found = static_cast<const char*>(
std::memchr(buf_begin_, '\n', buf_end_ - buf_begin_));
if (found) {
size_t len = found - buf_begin_;
out->append(buf_begin_, len);
buf_begin_ += len + /*delim*/ 1;
++line_number_;
return true;
}
if (at_eof_) {
io_status_.MustCheck();
return false;
}
// else flush and reload buffer
out->append(buf_begin_, buf_end_ - buf_begin_);
Slice result;
io_status_ =
sfr_.Read(buf_.size(), &result, buf_.data(), rate_limiter_priority);
IOSTATS_ADD(bytes_read, result.size());
if (!io_status_.ok()) {
io_status_.MustCheck();
return false;
}
if (result.size() != buf_.size()) {
// The obscure way of indicating EOF
at_eof_ = true;
}
buf_begin_ = result.data();
buf_end_ = result.data() + result.size();
}
}
} // namespace ROCKSDB_NAMESPACE