Skip to content

Commit 357a86b

Browse files
committed
fixup! fixup! ...
1 parent 81f1f92 commit 357a86b

File tree

1 file changed

+59
-18
lines changed

1 file changed

+59
-18
lines changed

src/dataqueue/queue.cc

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,8 @@ class DataQueueEntry : public EntryBase {
797797

798798
// Essentially an entry that exists to give the Javascript side
799799
// control of what happens when data is read. Always non-idempotent.
800-
class StreamEntry final : public BaseObject, public EntryBase {
800+
class StreamEntry final : public BaseObject,
801+
public EntryBase {
801802
public:
802803
static void New(const FunctionCallbackInfo<Value>& args) {
803804
CHECK(args.IsConstructCall());
@@ -911,8 +912,8 @@ class StreamEntry final : public BaseObject, public EntryBase {
911912
// is provided by an underlying StreamBase implementation. Data is never
912913
// buffered and the size is never known in advance.
913914
class StreamBaseEntry final : public BaseObject,
914-
public EntryBase{
915-
public:
915+
public EntryBase {
916+
public:
916917
static void New(const FunctionCallbackInfo<Value>& args) {
917918
CHECK(args.IsConstructCall());
918919
Environment* env = Environment::GetCurrent(args);
@@ -1081,17 +1082,19 @@ class StreamBaseEntry final : public BaseObject,
10811082

10821083
// ============================================================================
10831084

1084-
class FdEntry : public EntryBase {
1085+
class FdEntry final : public EntryBase {
10851086
public:
1086-
FdEntry(int fd) : fd_(fd), start_(0) {
1087+
FdEntry(int fd, size_t start, size_t end = 0)
1088+
: fd_(fd), start_(0), end_(end) {
10871089
CHECK(fd);
1088-
uv_fs_t req;
1089-
uv_fs_fstat(nullptr, &req, fd, nullptr);
1090-
1091-
stat_ = req.statbuf;
1092-
end_ = stat_.st_size;
1090+
err_ = UpdateStat();
1091+
if (err_ == 0 && (end_ == 0 || end_ > stat_.st_size))
1092+
end_ = stat_.st_size;
10931093
}
10941094

1095+
FdEntry(int fd, uv_stat_t stat, size_t start, size_t end)
1096+
: fd_(fd), start_(start), end_(end), stat_(stat) {}
1097+
10951098
std::unique_ptr<DataQueue::Reader> getReader() override {
10961099
// TODO(@flakey5): streambase reader w/ validation
10971100
return nullptr;
@@ -1100,30 +1103,68 @@ class FdEntry : public EntryBase {
11001103
std::unique_ptr<Entry> slice(
11011104
size_t start,
11021105
Maybe<size_t> end = Nothing<size_t>()) override {
1103-
size_t newSize = end.IsJust() ? end.ToChecked() : end_;
1106+
size_t new_start = start_ + start;
1107+
size_t new_end = end_;
1108+
if (end.IsJust()) {
1109+
new_end = std::min(end.FromJust() + start, new_end);
1110+
}
11041111

1105-
CHECK(start >= start_);
1106-
CHECK(newSize <= end_);
1112+
CHECK(new_start >= start_);
1113+
CHECK(new_end <= end_);
11071114

1108-
return std::make_unique<FdEntry>(fd_, stat_, start, newSize);
1115+
return std::make_unique<FdEntry>(fd_, stat_, new_start, new_end);
11091116
}
11101117

11111118
Maybe<size_t> size() const override {
11121119
return Just(end_ - start_);
11131120
}
11141121

1115-
bool isIdempotent() const override final {
1122+
bool isIdempotent() const override {
11161123
return true;
11171124
}
11181125

1126+
SET_NO_MEMORY_INFO()
1127+
SET_MEMORY_INFO_NAME(FdEntry)
1128+
SET_SELF_SIZE(FdEntry)
1129+
1130+
class Reader : public DataQueue::Reader {
1131+
public:
1132+
Reader(Environment* env, int fd) : env_(env), fd_(fd) {}
1133+
1134+
int Pull(
1135+
Next next,
1136+
int options,
1137+
DataQueue::Vec* data,
1138+
size_t count,
1139+
size_t max_count_hint = bob::kMaxCountHint) override {
1140+
uv_fs_fstat(env_->event_loop(), &req_, fd_, &OnStat);
1141+
}
1142+
1143+
private:
1144+
Environment* env_;
1145+
int fd_;
1146+
uv_fs_t req_;
1147+
1148+
static void OnStat(uv_fs_t* req) {
1149+
uv_stat_t current_stat = req->statbuf;
1150+
// Compare the current stat to make sure it has not changed...
1151+
// Then read.
1152+
// call uv_fs_read...
1153+
}
1154+
};
1155+
11191156
private:
1120-
FdEntry(int fd, uv_stat_t stat, size_t start, size_t end)
1121-
: fd_(fd), stat_(stat), start_(start), end_(end) {}
1157+
int UpdateStat() {
1158+
uv_fs_t req;
1159+
uv_fs_fstat(nullptr, &req, fd_, nullptr);
1160+
stat_ = req.statbuf;
1161+
}
11221162

11231163
int fd_;
1124-
uv_stat_t stat_;
1164+
int err_ = 0;
11251165
size_t start_;
11261166
size_t end_;
1167+
uv_stat_t stat_;
11271168
};
11281169

11291170
// ============================================================================

0 commit comments

Comments
 (0)