Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix flush/close semantics for HTTP files, improve testing #1232

Merged
merged 6 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packager/file/callback_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ int64_t CallbackFile::Write(const void* buffer, uint64_t length) {
return callback_params_->write_func(name_, buffer, length);
}

void CallbackFile::CloseForWriting() {}

int64_t CallbackFile::Size() {
LOG(INFO) << "CallbackFile does not support Size().";
return -1;
Expand Down
1 change: 1 addition & 0 deletions packager/file/callback_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class CallbackFile : public File {
bool Close() override;
int64_t Read(void* buffer, uint64_t length) override;
int64_t Write(const void* buffer, uint64_t length) override;
void CloseForWriting() override;
int64_t Size() override;
bool Flush() override;
bool Seek(uint64_t position) override;
Expand Down
6 changes: 6 additions & 0 deletions packager/file/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ class SHAKA_EXPORT File {
/// @return Number of bytes written, or a value < 0 on error.
virtual int64_t Write(const void* buffer, uint64_t length) = 0;

/// Close the file for writing. This signals that no more data will be
/// written. Future writes are invalid and their behavior is undefined!
/// Data may still be read from the file after calling this method.
/// Some implementations may ignore this if they cannot use the signal.
virtual void CloseForWriting() = 0;

/// @return Size of the file in bytes. A return value less than zero
/// indicates a problem getting the size.
virtual int64_t Size() = 0;
Expand Down
13 changes: 10 additions & 3 deletions packager/file/http_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,12 @@ bool HttpFile::Open() {

Status HttpFile::CloseWithStatus() {
VLOG(2) << "Closing " << url_;
// Close the cache first so the thread will finish uploading. Otherwise it
// will wait for more data forever.
download_cache_.Close();

// Close the upload cache first so the thread will finish uploading.
// Otherwise it will wait for more data forever.
// Don't close the download cache, so that the server's response (HTTP status
// code at minimum) can still be written after uploading is complete.
// The task will close the download cache when it is complete.
upload_cache_.Close();
task_exit_event_.WaitForNotification();

Expand All @@ -260,6 +263,10 @@ int64_t HttpFile::Write(const void* buffer, uint64_t length) {
return upload_cache_.Write(buffer, length);
}

void HttpFile::CloseForWriting() {
upload_cache_.Close();
}

int64_t HttpFile::Size() {
VLOG(1) << "HttpFile does not support Size().";
return -1;
Expand Down
1 change: 1 addition & 0 deletions packager/file/http_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class HttpFile : public File {
bool Close() override;
int64_t Read(void* buffer, uint64_t length) override;
int64_t Write(const void* buffer, uint64_t length) override;
void CloseForWriting() override;
int64_t Size() override;
bool Flush() override;
bool Seek(uint64_t position) override;
Expand Down
117 changes: 91 additions & 26 deletions packager/file/http_file_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@

#include <gtest/gtest.h>

#include <chrono>
#include <memory>
#include <thread>
#include <vector>

#include "absl/strings/str_split.h"
#include "nlohmann/json.hpp"
#include "packager/file/file.h"
#include "packager/file/file_closer.h"
#include "packager/media/test/test_web_server.h"

#define ASSERT_JSON_STRING(json, key, value) \
ASSERT_EQ(GetJsonString((json), (key)), (value)) << "JSON is " << (json)
Expand All @@ -23,6 +26,23 @@ namespace shaka {

namespace {

// A completely arbitrary port number, unlikely to be in use.
const int kTestServerPort = 58405;
// Reflects back the method, body, and headers of the request as JSON.
const std::string kTestServerReflect = "http://localhost:58405/reflect";
// Returns the requested HTTP status code.
const std::string kTestServer404 = "http://localhost:58405/status?code=404";
// Returns after the requested delay.
const std::string kTestServerLongDelay =
"http://localhost:58405/delay?seconds=8";
const std::string kTestServerShortDelay =
"http://localhost:58405/delay?seconds=1";

const std::vector<std::string> kNoHeaders;
const std::string kNoContentType;
const std::string kBinaryContentType = "application/octet-stream";
const int kDefaultTestTimeout = 10; // For a local, embedded server

using FilePtr = std::unique_ptr<HttpFile, FileCloser>;

// Handles keys with dots, indicating a nested field.
Expand Down Expand Up @@ -64,10 +84,25 @@ nlohmann::json HandleResponse(const FilePtr& file) {
return value;
}

// Quoting gtest docs:
// "For each TEST_F, GoogleTest will create a fresh test fixture object,
// immediately call SetUp(), run the test body, call TearDown(), and then
// delete the test fixture object."
// So we don't need a TearDown method. The destructor on TestWebServer is good
// enough.
class HttpFileTest : public testing::Test {
protected:
void SetUp() override { ASSERT_TRUE(server_.Start(kTestServerPort)); }

private:
media::TestWebServer server_;
};

} // namespace

TEST(HttpFileTest, BasicGet) {
FilePtr file(new HttpFile(HttpMethod::kGet, "https://httpbin.org/anything"));
TEST_F(HttpFileTest, BasicGet) {
FilePtr file(new HttpFile(HttpMethod::kGet, kTestServerReflect,
kNoContentType, kNoHeaders, kDefaultTestTimeout));
ASSERT_TRUE(file);
ASSERT_TRUE(file->Open());

Expand All @@ -77,10 +112,10 @@ TEST(HttpFileTest, BasicGet) {
ASSERT_JSON_STRING(json, "method", "GET");
}

TEST(HttpFileTest, CustomHeaders) {
TEST_F(HttpFileTest, CustomHeaders) {
std::vector<std::string> headers{"Host: foo", "X-My-Header: Something"};
FilePtr file(new HttpFile(HttpMethod::kGet, "https://httpbin.org/anything",
"", headers, 0));
FilePtr file(new HttpFile(HttpMethod::kGet, kTestServerReflect,
kNoContentType, headers, kDefaultTestTimeout));
ASSERT_TRUE(file);
ASSERT_TRUE(file->Open());

Expand All @@ -93,8 +128,10 @@ TEST(HttpFileTest, CustomHeaders) {
ASSERT_JSON_STRING(json, "headers.X-My-Header", "Something");
}

TEST(HttpFileTest, BasicPost) {
FilePtr file(new HttpFile(HttpMethod::kPost, "https://httpbin.org/anything"));
TEST_F(HttpFileTest, BasicPost) {
FilePtr file(new HttpFile(HttpMethod::kPost, kTestServerReflect,
kBinaryContentType, kNoHeaders,
kDefaultTestTimeout));
ASSERT_TRUE(file);
ASSERT_TRUE(file->Open());

Expand All @@ -104,13 +141,17 @@ TEST(HttpFileTest, BasicPost) {
static_cast<int64_t>(data.size()));
ASSERT_TRUE(file->Flush());

// Tells the server in a chunked upload that there will be no more chunks.
// If we don't do this, the request can hang in libcurl.
file->CloseForWriting();

auto json = HandleResponse(file);
ASSERT_TRUE(json.is_object());
ASSERT_TRUE(file.release()->Close());

ASSERT_JSON_STRING(json, "method", "POST");
ASSERT_JSON_STRING(json, "data", data);
ASSERT_JSON_STRING(json, "headers.Content-Type", "application/octet-stream");
ASSERT_JSON_STRING(json, "body", data);
ASSERT_JSON_STRING(json, "headers.Content-Type", kBinaryContentType);

// Curl may choose to send chunked or not based on the data. We request
// chunked encoding, but don't control if it is actually used. If we get
Expand All @@ -123,8 +164,10 @@ TEST(HttpFileTest, BasicPost) {
}
}

TEST(HttpFileTest, BasicPut) {
FilePtr file(new HttpFile(HttpMethod::kPut, "https://httpbin.org/anything"));
TEST_F(HttpFileTest, BasicPut) {
FilePtr file(new HttpFile(HttpMethod::kPut, kTestServerReflect,
kBinaryContentType, kNoHeaders,
kDefaultTestTimeout));
ASSERT_TRUE(file);
ASSERT_TRUE(file->Open());

Expand All @@ -134,13 +177,17 @@ TEST(HttpFileTest, BasicPut) {
static_cast<int64_t>(data.size()));
ASSERT_TRUE(file->Flush());

// Tells the server in a chunked upload that there will be no more chunks.
// If we don't do this, the request can hang in libcurl.
file->CloseForWriting();

auto json = HandleResponse(file);
ASSERT_TRUE(json.is_object());
ASSERT_TRUE(file.release()->Close());

ASSERT_JSON_STRING(json, "method", "PUT");
ASSERT_JSON_STRING(json, "data", data);
ASSERT_JSON_STRING(json, "headers.Content-Type", "application/octet-stream");
ASSERT_JSON_STRING(json, "body", data);
ASSERT_JSON_STRING(json, "headers.Content-Type", kBinaryContentType);

// Curl may choose to send chunked or not based on the data. We request
// chunked encoding, but don't control if it is actually used. If we get
Expand All @@ -153,8 +200,10 @@ TEST(HttpFileTest, BasicPut) {
}
}

TEST(HttpFileTest, MultipleWrites) {
FilePtr file(new HttpFile(HttpMethod::kPut, "https://httpbin.org/anything"));
TEST_F(HttpFileTest, MultipleWrites) {
FilePtr file(new HttpFile(HttpMethod::kPut, kTestServerReflect,
kBinaryContentType, kNoHeaders,
kDefaultTestTimeout));
ASSERT_TRUE(file);
ASSERT_TRUE(file->Open());

Expand All @@ -165,21 +214,35 @@ TEST(HttpFileTest, MultipleWrites) {

ASSERT_EQ(file->Write(data1.data(), data1.size()),
static_cast<int64_t>(data1.size()));
ASSERT_TRUE(file->Flush());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we call Flush after every write in this test? Is it needed after every write?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It adds coverage for the change in Flush() semantics from #1201. Now that Flush() only flushes, this shows the new behavior. Before #1201, you could not Write() after Flush().

In the test server, you can see those pieces written as separate chunks now. This isn't used yet, but it may be an important detail for chunked uploads for LL streaming in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. It is probably better to add another MultipleWritesWithFlush to test this case. Otherwise consecutive Writes without Flush is not tested.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// Flush the first chunk.

ASSERT_EQ(file->Write(data2.data(), data2.size()),
static_cast<int64_t>(data2.size()));
ASSERT_TRUE(file->Flush());
// Flush the second chunk.

ASSERT_EQ(file->Write(data3.data(), data3.size()),
static_cast<int64_t>(data3.size()));
ASSERT_TRUE(file->Flush());
// Flush the third chunk.

ASSERT_EQ(file->Write(data4.data(), data4.size()),
static_cast<int64_t>(data4.size()));
ASSERT_TRUE(file->Flush());
// Flush the fourth chunk.

// Tells the server in a chunked upload that there will be no more chunks.
// If we don't do this, the request can hang in libcurl.
file->CloseForWriting();

auto json = HandleResponse(file);
ASSERT_TRUE(json.is_object());
ASSERT_TRUE(file.release()->Close());

ASSERT_JSON_STRING(json, "method", "PUT");
ASSERT_JSON_STRING(json, "data", data1 + data2 + data3 + data4);
ASSERT_JSON_STRING(json, "headers.Content-Type", "application/octet-stream");
ASSERT_JSON_STRING(json, "body", data1 + data2 + data3 + data4);
ASSERT_JSON_STRING(json, "headers.Content-Type", kBinaryContentType);

// Curl may choose to send chunked or not based on the data. We request
// chunked encoding, but don't control if it is actually used. If we get
Expand All @@ -195,9 +258,9 @@ TEST(HttpFileTest, MultipleWrites) {

// TODO: Test chunked uploads explicitly.

TEST(HttpFileTest, Error404) {
FilePtr file(
new HttpFile(HttpMethod::kGet, "https://httpbin.org/status/404"));
TEST_F(HttpFileTest, Error404) {
FilePtr file(new HttpFile(HttpMethod::kGet, kTestServer404, kNoContentType,
kNoHeaders, kDefaultTestTimeout));
ASSERT_TRUE(file);
ASSERT_TRUE(file->Open());

Expand All @@ -210,9 +273,10 @@ TEST(HttpFileTest, Error404) {
ASSERT_EQ(status.error_code(), error::HTTP_FAILURE);
}

TEST(HttpFileTest, TimeoutTriggered) {
FilePtr file(
new HttpFile(HttpMethod::kGet, "https://httpbin.org/delay/8", "", {}, 1));
TEST_F(HttpFileTest, TimeoutTriggered) {
FilePtr file(new HttpFile(HttpMethod::kGet, kTestServerLongDelay,
kNoContentType, kNoHeaders,
1 /* timeout in seconds */));
ASSERT_TRUE(file);
ASSERT_TRUE(file->Open());

Expand All @@ -225,9 +289,10 @@ TEST(HttpFileTest, TimeoutTriggered) {
ASSERT_EQ(status.error_code(), error::TIME_OUT);
}

TEST(HttpFileTest, TimeoutNotTriggered) {
FilePtr file(
new HttpFile(HttpMethod::kGet, "https://httpbin.org/delay/1", "", {}, 5));
TEST_F(HttpFileTest, TimeoutNotTriggered) {
FilePtr file(new HttpFile(HttpMethod::kGet, kTestServerShortDelay,
kNoContentType, kNoHeaders,
5 /* timeout in seconds */));
ASSERT_TRUE(file);
ASSERT_TRUE(file->Open());

Expand Down
2 changes: 2 additions & 0 deletions packager/file/local_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ int64_t LocalFile::Write(const void* buffer, uint64_t length) {
return bytes_written;
}

void LocalFile::CloseForWriting() {}

int64_t LocalFile::Size() {
DCHECK(internal_file_ != NULL);

Expand Down
1 change: 1 addition & 0 deletions packager/file/local_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class LocalFile : public File {
bool Close() override;
int64_t Read(void* buffer, uint64_t length) override;
int64_t Write(const void* buffer, uint64_t length) override;
void CloseForWriting() override;
int64_t Size() override;
bool Flush() override;
bool Seek(uint64_t position) override;
Expand Down
2 changes: 2 additions & 0 deletions packager/file/memory_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ int64_t MemoryFile::Write(const void* buffer, uint64_t length) {
return length;
}

void MemoryFile::CloseForWriting() {}

int64_t MemoryFile::Size() {
DCHECK(file_);
return file_->size();
Expand Down
1 change: 1 addition & 0 deletions packager/file/memory_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class MemoryFile : public File {
bool Close() override;
int64_t Read(void* buffer, uint64_t length) override;
int64_t Write(const void* buffer, uint64_t length) override;
void CloseForWriting() override;
int64_t Size() override;
bool Flush() override;
bool Seek(uint64_t position) override;
Expand Down
2 changes: 2 additions & 0 deletions packager/file/threaded_io_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ int64_t ThreadedIoFile::Write(const void* buffer, uint64_t length) {
return bytes_written;
}

void ThreadedIoFile::CloseForWriting() {}

int64_t ThreadedIoFile::Size() {
DCHECK(internal_file_);

Expand Down
1 change: 1 addition & 0 deletions packager/file/threaded_io_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class ThreadedIoFile : public File {
bool Close() override;
int64_t Read(void* buffer, uint64_t length) override;
int64_t Write(const void* buffer, uint64_t length) override;
void CloseForWriting() override;
int64_t Size() override;
bool Flush() override;
bool Seek(uint64_t position) override;
Expand Down
8 changes: 8 additions & 0 deletions packager/file/udp_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ int64_t UdpFile::Write(const void* buffer, uint64_t length) {
return -1;
}

void UdpFile::CloseForWriting() {
#if defined(OS_WIN)
shutdown(socket_, SD_SEND);
#else
shutdown(socket_, SHUT_WR);
#endif
}

int64_t UdpFile::Size() {
if (socket_ == INVALID_SOCKET)
return -1;
Expand Down
1 change: 1 addition & 0 deletions packager/file/udp_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class UdpFile : public File {
bool Close() override;
int64_t Read(void* buffer, uint64_t length) override;
int64_t Write(const void* buffer, uint64_t length) override;
void CloseForWriting() override;
int64_t Size() override;
bool Flush() override;
bool Seek(uint64_t position) override;
Expand Down
3 changes: 2 additions & 1 deletion packager/media/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,6 @@ target_link_libraries(media_base_unittest
gmock
gtest
gtest_main
test_data_util)
test_data_util
test_web_server)
add_test(NAME media_base_unittest COMMAND media_base_unittest)
1 change: 1 addition & 0 deletions packager/media/base/http_key_fetcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Status HttpKeyFetcher::FetchInternal(HttpMethod method,
}
file->Write(data.data(), data.size());
file->Flush();
file->CloseForWriting();

while (true) {
char temp[kBufferSize];
Expand Down
Loading