Skip to content

Commit

Permalink
fix: Fix flush/close semantics for HTTP files, improve testing (#1232)
Browse files Browse the repository at this point in the history
All HTTP-based tests now use an embedded test server instead of
httpbin.org, which makes them much faster and more reliable.

These more reliable tests also exposed some issues that began recently
with PR #1201.  HttpFile's Flush() semantics were different than those
documented for files in general.  Flush() used to close the file for
uploading, so that no further writes were allowed, but the documentation
stated that it would only flush data to its destination.  PR #1201
brought HttpFile's Flush() in line with the docs, but gave us no way to
terminate a chunked upload.

This adds a new method to File called CloseForWriting(), which
terminates a chunked upload for HttpFile.  The only other implementation
that does anything is UdpFile, which uses the socket library function
shutdown() to terminate writes while allowing reads.

This also tweaks HttpFile::CloseWithStatus() so that it will not
generate an error if the file is closed before the HTTP response is
written to the download cache.

This modifies the test HttpFileTest.MultipleWrites so that the file is
Flushed after each chunk.  This adds test coverage for the changes
introduced in PR #1201.

Fixes #1224 (missing test coverage for HttpFile::Flush)
  • Loading branch information
joeyparrish authored Jul 14, 2023
1 parent d4fcfb2 commit af98d48
Show file tree
Hide file tree
Showing 18 changed files with 203 additions and 53 deletions.
2 changes: 2 additions & 0 deletions packager/file/callback_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ int64_t CallbackFile::Write(const void* buffer, uint64_t length) {
return callback_params_->write_func(name_, buffer, length);
}

void CallbackFile::CloseForWriting() {}

int64_t CallbackFile::Size() {
LOG(INFO) << "CallbackFile does not support Size().";
return -1;
Expand Down
1 change: 1 addition & 0 deletions packager/file/callback_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class CallbackFile : public File {
bool Close() override;
int64_t Read(void* buffer, uint64_t length) override;
int64_t Write(const void* buffer, uint64_t length) override;
void CloseForWriting() override;
int64_t Size() override;
bool Flush() override;
bool Seek(uint64_t position) override;
Expand Down
6 changes: 6 additions & 0 deletions packager/file/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ class SHAKA_EXPORT File {
/// @return Number of bytes written, or a value < 0 on error.
virtual int64_t Write(const void* buffer, uint64_t length) = 0;

/// Close the file for writing. This signals that no more data will be
/// written. Future writes are invalid and their behavior is undefined!
/// Data may still be read from the file after calling this method.
/// Some implementations may ignore this if they cannot use the signal.
virtual void CloseForWriting() = 0;

/// @return Size of the file in bytes. A return value less than zero
/// indicates a problem getting the size.
virtual int64_t Size() = 0;
Expand Down
14 changes: 11 additions & 3 deletions packager/file/http_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,12 @@ bool HttpFile::Open() {

Status HttpFile::CloseWithStatus() {
VLOG(2) << "Closing " << url_;
// Close the cache first so the thread will finish uploading. Otherwise it
// will wait for more data forever.
download_cache_.Close();

// Close the upload cache first so the thread will finish uploading.
// Otherwise it will wait for more data forever.
// Don't close the download cache, so that the server's response (HTTP status
// code at minimum) can still be written after uploading is complete.
// The task will close the download cache when it is complete.
upload_cache_.Close();
task_exit_event_.WaitForNotification();

Expand All @@ -260,6 +263,11 @@ int64_t HttpFile::Write(const void* buffer, uint64_t length) {
return upload_cache_.Write(buffer, length);
}

void HttpFile::CloseForWriting() {
VLOG(2) << "Closing further writes to " << url_;
upload_cache_.Close();
}

int64_t HttpFile::Size() {
VLOG(1) << "HttpFile does not support Size().";
return -1;
Expand Down
1 change: 1 addition & 0 deletions packager/file/http_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class HttpFile : public File {
bool Close() override;
int64_t Read(void* buffer, uint64_t length) override;
int64_t Write(const void* buffer, uint64_t length) override;
void CloseForWriting() override;
int64_t Size() override;
bool Flush() override;
bool Seek(uint64_t position) override;
Expand Down
152 changes: 122 additions & 30 deletions packager/file/http_file_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "nlohmann/json.hpp"
#include "packager/file/file.h"
#include "packager/file/file_closer.h"
#include "packager/media/test/test_web_server.h"

#define ASSERT_JSON_STRING(json, key, value) \
ASSERT_EQ(GetJsonString((json), (key)), (value)) << "JSON is " << (json)
Expand All @@ -23,6 +24,23 @@ namespace shaka {

namespace {

// A completely arbitrary port number, unlikely to be in use.
const int kTestServerPort = 58405;
// Reflects back the method, body, and headers of the request as JSON.
const std::string kTestServerReflect = "http://localhost:58405/reflect";
// Returns the requested HTTP status code.
const std::string kTestServer404 = "http://localhost:58405/status?code=404";
// Returns after the requested delay.
const std::string kTestServerLongDelay =
"http://localhost:58405/delay?seconds=8";
const std::string kTestServerShortDelay =
"http://localhost:58405/delay?seconds=1";

const std::vector<std::string> kNoHeaders;
const std::string kNoContentType;
const std::string kBinaryContentType = "application/octet-stream";
const int kDefaultTestTimeout = 10; // For a local, embedded server

using FilePtr = std::unique_ptr<HttpFile, FileCloser>;

// Handles keys with dots, indicating a nested field.
Expand Down Expand Up @@ -64,10 +82,25 @@ nlohmann::json HandleResponse(const FilePtr& file) {
return value;
}

// Quoting gtest docs:
// "For each TEST_F, GoogleTest will create a fresh test fixture object,
// immediately call SetUp(), run the test body, call TearDown(), and then
// delete the test fixture object."
// So we don't need a TearDown method. The destructor on TestWebServer is good
// enough.
class HttpFileTest : public testing::Test {
protected:
void SetUp() override { ASSERT_TRUE(server_.Start(kTestServerPort)); }

private:
media::TestWebServer server_;
};

} // namespace

TEST(HttpFileTest, BasicGet) {
FilePtr file(new HttpFile(HttpMethod::kGet, "https://httpbin.org/anything"));
TEST_F(HttpFileTest, BasicGet) {
FilePtr file(new HttpFile(HttpMethod::kGet, kTestServerReflect,
kNoContentType, kNoHeaders, kDefaultTestTimeout));
ASSERT_TRUE(file);
ASSERT_TRUE(file->Open());

Expand All @@ -77,10 +110,10 @@ TEST(HttpFileTest, BasicGet) {
ASSERT_JSON_STRING(json, "method", "GET");
}

TEST(HttpFileTest, CustomHeaders) {
TEST_F(HttpFileTest, CustomHeaders) {
std::vector<std::string> headers{"Host: foo", "X-My-Header: Something"};
FilePtr file(new HttpFile(HttpMethod::kGet, "https://httpbin.org/anything",
"", headers, 0));
FilePtr file(new HttpFile(HttpMethod::kGet, kTestServerReflect,
kNoContentType, headers, kDefaultTestTimeout));
ASSERT_TRUE(file);
ASSERT_TRUE(file->Open());

Expand All @@ -93,24 +126,28 @@ TEST(HttpFileTest, CustomHeaders) {
ASSERT_JSON_STRING(json, "headers.X-My-Header", "Something");
}

TEST(HttpFileTest, BasicPost) {
FilePtr file(new HttpFile(HttpMethod::kPost, "https://httpbin.org/anything"));
TEST_F(HttpFileTest, BasicPost) {
FilePtr file(new HttpFile(HttpMethod::kPost, kTestServerReflect,
kBinaryContentType, kNoHeaders,
kDefaultTestTimeout));
ASSERT_TRUE(file);
ASSERT_TRUE(file->Open());

const std::string data = "abcd";

ASSERT_EQ(file->Write(data.data(), data.size()),
static_cast<int64_t>(data.size()));
ASSERT_TRUE(file->Flush());
// Signal that there will be no more writes.
// If we don't do this, the request can hang in libcurl.
file->CloseForWriting();

auto json = HandleResponse(file);
ASSERT_TRUE(json.is_object());
ASSERT_TRUE(file.release()->Close());

ASSERT_JSON_STRING(json, "method", "POST");
ASSERT_JSON_STRING(json, "data", data);
ASSERT_JSON_STRING(json, "headers.Content-Type", "application/octet-stream");
ASSERT_JSON_STRING(json, "body", data);
ASSERT_JSON_STRING(json, "headers.Content-Type", kBinaryContentType);

// Curl may choose to send chunked or not based on the data. We request
// chunked encoding, but don't control if it is actually used. If we get
Expand All @@ -123,24 +160,28 @@ TEST(HttpFileTest, BasicPost) {
}
}

TEST(HttpFileTest, BasicPut) {
FilePtr file(new HttpFile(HttpMethod::kPut, "https://httpbin.org/anything"));
TEST_F(HttpFileTest, BasicPut) {
FilePtr file(new HttpFile(HttpMethod::kPut, kTestServerReflect,
kBinaryContentType, kNoHeaders,
kDefaultTestTimeout));
ASSERT_TRUE(file);
ASSERT_TRUE(file->Open());

const std::string data = "abcd";

ASSERT_EQ(file->Write(data.data(), data.size()),
static_cast<int64_t>(data.size()));
ASSERT_TRUE(file->Flush());
// Signal that there will be no more writes.
// If we don't do this, the request can hang in libcurl.
file->CloseForWriting();

auto json = HandleResponse(file);
ASSERT_TRUE(json.is_object());
ASSERT_TRUE(file.release()->Close());

ASSERT_JSON_STRING(json, "method", "PUT");
ASSERT_JSON_STRING(json, "data", data);
ASSERT_JSON_STRING(json, "headers.Content-Type", "application/octet-stream");
ASSERT_JSON_STRING(json, "body", data);
ASSERT_JSON_STRING(json, "headers.Content-Type", kBinaryContentType);

// Curl may choose to send chunked or not based on the data. We request
// chunked encoding, but don't control if it is actually used. If we get
Expand All @@ -153,8 +194,10 @@ TEST(HttpFileTest, BasicPut) {
}
}

TEST(HttpFileTest, MultipleWrites) {
FilePtr file(new HttpFile(HttpMethod::kPut, "https://httpbin.org/anything"));
TEST_F(HttpFileTest, MultipleWrites) {
FilePtr file(new HttpFile(HttpMethod::kPut, kTestServerReflect,
kBinaryContentType, kNoHeaders,
kDefaultTestTimeout));
ASSERT_TRUE(file);
ASSERT_TRUE(file->Open());

Expand All @@ -171,15 +214,17 @@ TEST(HttpFileTest, MultipleWrites) {
static_cast<int64_t>(data3.size()));
ASSERT_EQ(file->Write(data4.data(), data4.size()),
static_cast<int64_t>(data4.size()));
ASSERT_TRUE(file->Flush());
// Signal that there will be no more writes.
// If we don't do this, the request can hang in libcurl.
file->CloseForWriting();

auto json = HandleResponse(file);
ASSERT_TRUE(json.is_object());
ASSERT_TRUE(file.release()->Close());

ASSERT_JSON_STRING(json, "method", "PUT");
ASSERT_JSON_STRING(json, "data", data1 + data2 + data3 + data4);
ASSERT_JSON_STRING(json, "headers.Content-Type", "application/octet-stream");
ASSERT_JSON_STRING(json, "body", data1 + data2 + data3 + data4);
ASSERT_JSON_STRING(json, "headers.Content-Type", kBinaryContentType);

// Curl may choose to send chunked or not based on the data. We request
// chunked encoding, but don't control if it is actually used. If we get
Expand All @@ -193,11 +238,56 @@ TEST(HttpFileTest, MultipleWrites) {
}
}

// TODO: Test chunked uploads explicitly.
TEST_F(HttpFileTest, MultipleChunks) {
FilePtr file(new HttpFile(HttpMethod::kPut, kTestServerReflect,
kBinaryContentType, kNoHeaders,
kDefaultTestTimeout));
ASSERT_TRUE(file);
ASSERT_TRUE(file->Open());

// Each of these is written as an explicit chunk to the server.
const std::string data1 = "abcd";
const std::string data2 = "efgh";
const std::string data3 = "ijkl";
const std::string data4 = "mnop";

ASSERT_EQ(file->Write(data1.data(), data1.size()),
static_cast<int64_t>(data1.size()));
// Flush the first chunk.
ASSERT_TRUE(file->Flush());

ASSERT_EQ(file->Write(data2.data(), data2.size()),
static_cast<int64_t>(data2.size()));
// Flush the second chunk.
ASSERT_TRUE(file->Flush());

ASSERT_EQ(file->Write(data3.data(), data3.size()),
static_cast<int64_t>(data3.size()));
// Flush the third chunk.
ASSERT_TRUE(file->Flush());

ASSERT_EQ(file->Write(data4.data(), data4.size()),
static_cast<int64_t>(data4.size()));
// Flush the fourth chunk.
ASSERT_TRUE(file->Flush());

// Signal that there will be no more writes.
// If we don't do this, the request can hang in libcurl.
file->CloseForWriting();

auto json = HandleResponse(file);
ASSERT_TRUE(json.is_object());
ASSERT_TRUE(file.release()->Close());

ASSERT_JSON_STRING(json, "method", "PUT");
ASSERT_JSON_STRING(json, "body", data1 + data2 + data3 + data4);
ASSERT_JSON_STRING(json, "headers.Content-Type", kBinaryContentType);
ASSERT_JSON_STRING(json, "headers.Transfer-Encoding", "chunked");
}

TEST(HttpFileTest, Error404) {
FilePtr file(
new HttpFile(HttpMethod::kGet, "https://httpbin.org/status/404"));
TEST_F(HttpFileTest, Error404) {
FilePtr file(new HttpFile(HttpMethod::kGet, kTestServer404, kNoContentType,
kNoHeaders, kDefaultTestTimeout));
ASSERT_TRUE(file);
ASSERT_TRUE(file->Open());

Expand All @@ -210,9 +300,10 @@ TEST(HttpFileTest, Error404) {
ASSERT_EQ(status.error_code(), error::HTTP_FAILURE);
}

TEST(HttpFileTest, TimeoutTriggered) {
FilePtr file(
new HttpFile(HttpMethod::kGet, "https://httpbin.org/delay/8", "", {}, 1));
TEST_F(HttpFileTest, TimeoutTriggered) {
FilePtr file(new HttpFile(HttpMethod::kGet, kTestServerLongDelay,
kNoContentType, kNoHeaders,
1 /* timeout in seconds */));
ASSERT_TRUE(file);
ASSERT_TRUE(file->Open());

Expand All @@ -225,9 +316,10 @@ TEST(HttpFileTest, TimeoutTriggered) {
ASSERT_EQ(status.error_code(), error::TIME_OUT);
}

TEST(HttpFileTest, TimeoutNotTriggered) {
FilePtr file(
new HttpFile(HttpMethod::kGet, "https://httpbin.org/delay/1", "", {}, 5));
TEST_F(HttpFileTest, TimeoutNotTriggered) {
FilePtr file(new HttpFile(HttpMethod::kGet, kTestServerShortDelay,
kNoContentType, kNoHeaders,
5 /* timeout in seconds */));
ASSERT_TRUE(file);
ASSERT_TRUE(file->Open());

Expand Down
2 changes: 2 additions & 0 deletions packager/file/local_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ int64_t LocalFile::Write(const void* buffer, uint64_t length) {
return bytes_written;
}

void LocalFile::CloseForWriting() {}

int64_t LocalFile::Size() {
DCHECK(internal_file_ != NULL);

Expand Down
1 change: 1 addition & 0 deletions packager/file/local_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class LocalFile : public File {
bool Close() override;
int64_t Read(void* buffer, uint64_t length) override;
int64_t Write(const void* buffer, uint64_t length) override;
void CloseForWriting() override;
int64_t Size() override;
bool Flush() override;
bool Seek(uint64_t position) override;
Expand Down
2 changes: 2 additions & 0 deletions packager/file/memory_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ int64_t MemoryFile::Write(const void* buffer, uint64_t length) {
return length;
}

void MemoryFile::CloseForWriting() {}

int64_t MemoryFile::Size() {
DCHECK(file_);
return file_->size();
Expand Down
1 change: 1 addition & 0 deletions packager/file/memory_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class MemoryFile : public File {
bool Close() override;
int64_t Read(void* buffer, uint64_t length) override;
int64_t Write(const void* buffer, uint64_t length) override;
void CloseForWriting() override;
int64_t Size() override;
bool Flush() override;
bool Seek(uint64_t position) override;
Expand Down
2 changes: 2 additions & 0 deletions packager/file/threaded_io_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ int64_t ThreadedIoFile::Write(const void* buffer, uint64_t length) {
return bytes_written;
}

void ThreadedIoFile::CloseForWriting() {}

int64_t ThreadedIoFile::Size() {
DCHECK(internal_file_);

Expand Down
1 change: 1 addition & 0 deletions packager/file/threaded_io_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class ThreadedIoFile : public File {
bool Close() override;
int64_t Read(void* buffer, uint64_t length) override;
int64_t Write(const void* buffer, uint64_t length) override;
void CloseForWriting() override;
int64_t Size() override;
bool Flush() override;
bool Seek(uint64_t position) override;
Expand Down
8 changes: 8 additions & 0 deletions packager/file/udp_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ int64_t UdpFile::Write(const void* buffer, uint64_t length) {
return -1;
}

void UdpFile::CloseForWriting() {
#if defined(OS_WIN)
shutdown(socket_, SD_SEND);
#else
shutdown(socket_, SHUT_WR);
#endif
}

int64_t UdpFile::Size() {
if (socket_ == INVALID_SOCKET)
return -1;
Expand Down
Loading

0 comments on commit af98d48

Please sign in to comment.