Skip to content

Commit

Permalink
Add ability to add debug output to docker/singularity commands at bui…
Browse files Browse the repository at this point in the history
…lder level. Fix, hopefully, gitlab docker registry building
  • Loading branch information
atj committed Jan 17, 2018
1 parent 21641bf commit 70f6232
Show file tree
Hide file tree
Showing 17 changed files with 205 additions and 142 deletions.
46 changes: 27 additions & 19 deletions Builder/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ namespace beast = boost::beast;
namespace websocket = beast::websocket;
namespace bp = boost::process;

using callback_type = std::function<void(const boost::system::error_code&, std::size_t size)>;
using callback_type = std::function<void(const boost::system::error_code &, std::size_t size)>;

ClientData read_client_data(websocket::stream<tcp::socket>& client_stream) {
ClientData read_client_data(websocket::stream<tcp::socket> &client_stream) {
Logger::info("Reading client data");

Logger::info("Reading serialized client data");
Expand All @@ -39,11 +39,11 @@ ClientData read_client_data(websocket::stream<tcp::socket>& client_stream) {
return client_data;
}

void read_file(websocket::stream<tcp::socket>& client_stream,
const std::string& file_name) {
void read_file(websocket::stream<tcp::socket> &client_stream,
const std::string &file_name) {
Logger::info("Opening " + file_name + " for writing");
std::ofstream file;
file.exceptions ( std::ofstream::failbit | std::ofstream::badbit );
file.exceptions(std::ofstream::failbit | std::ofstream::badbit);
file.open(file_name, std::fstream::out | std::fstream::binary | std::fstream::trunc);

// Read in file from websocket in chunks
Expand Down Expand Up @@ -74,26 +74,34 @@ void read_file(websocket::stream<tcp::socket>& client_stream,
Logger::info(file_name + " successfully read file");
}

std::string build_command(const ClientData& client_data) {
std::string build_command(const ClientData &client_data) {

std::string build_command;

// Basic build command
if (client_data.backend == BackendType::singularity) {
build_command = "/usr/bin/sudo /usr/local/bin/SingularityBuilderBackend";
} else if(client_data.backend == BackendType::docker) {
} else if (client_data.backend == BackendType::docker) {
build_command = "/usr/bin/sudo /usr/local/bin/DockerBuilderBackend";
} else {
throw std::runtime_error("Invalid builder backend");
}

// Enable debug output if requested
if (client_data.log_priority >= LogPriority::debug) {
build_command += " --debug";
}

Logger::info("Build command prepared: " + build_command);

return build_command;
}

void stream_build(websocket::stream<tcp::socket>& client_stream,
const std::string& build_command) {
void stream_build(websocket::stream<tcp::socket> &client_stream,
const std::string &build_command) {

Logger::info("Starting subprocess and redirecting output to pipe");
auto& io_context = client_stream.get_executor().context();
auto &io_context = client_stream.get_executor().context();
bp::async_pipe std_pipe{io_context};
bp::group group;
bp::child build_child(build_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group);
Expand All @@ -103,9 +111,9 @@ void stream_build(websocket::stream<tcp::socket>& client_stream,
const auto max_read_bytes = 4096;
std::array<char, max_read_bytes> std_buffer;

callback_type stream_output = [&] (const boost::system::error_code& error,
std::size_t bytes_read) {
if(error == asio::error::eof) {
callback_type stream_output = [&](const boost::system::error_code &error,
std::size_t bytes_read) {
if (error == asio::error::eof) {
client_stream.write_some(true, asio::buffer(std_buffer.data(), bytes_read));
return;
} else {
Expand All @@ -125,19 +133,19 @@ void stream_build(websocket::stream<tcp::socket>& client_stream,
auto exit_code_string = std::to_string(exit_code);
client_stream.write(asio::buffer(exit_code_string));

if(exit_code != 0) {
if (exit_code != 0) {
throw std::runtime_error("Container build failed");
}

Logger::info("Done streaming subprocess output");
}

void write_file(websocket::stream<tcp::socket>& client_stream,
const std::string& file_name) {
void write_file(websocket::stream<tcp::socket> &client_stream,
const std::string &file_name) {
Logger::info("Opening " + file_name + " for reading");

std::ifstream container;
container.exceptions ( std::ofstream::failbit | std::ofstream::badbit );
container.exceptions(std::ofstream::failbit | std::ofstream::badbit);
container.open(file_name, std::fstream::in | std::fstream::binary);
const auto file_size = boost::filesystem::file_size(file_name);

Expand All @@ -153,7 +161,7 @@ void write_file(websocket::stream<tcp::socket>& client_stream,
container.read(chunk_buffer.data(), chunk_size);
container_csc.process_bytes(chunk_buffer.data(), chunk_size);
bytes_remaining -= chunk_size;
if(bytes_remaining == 0)
if (bytes_remaining == 0)
fin = true;
client_stream.write_some(fin, asio::buffer(chunk_buffer.data(), chunk_size));
} while (!fin);
Expand Down Expand Up @@ -210,7 +218,7 @@ int main(int argc, char *argv[]) {
// Attempt to close connection
try {
client_stream.close(websocket::close_code::normal);
} catch(...) {
} catch (...) {
Logger::error("Failed to cleanly close the WebSocket");
}
return 0;
Expand Down
14 changes: 9 additions & 5 deletions BuilderQueue/include/BuilderQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace asio = boost::asio;

using FetchHandler = std::function<void (BuilderData builder)>;
using FetchHandler = std::function<void(BuilderData builder)>;

// Note that the handler, from Connection, contains 'self' which maintains the lifetime of the connection instance
// If a client disconnects while the handler is pending it won't be cleaned up
Expand All @@ -24,10 +24,13 @@ class BuilderQueue {
}

// Don't allow the queue to be copied or moved
BuilderQueue(const BuilderQueue&) =delete;
BuilderQueue& operator=(const BuilderQueue&) =delete;
BuilderQueue(BuilderQueue&&) noexcept =delete;
BuilderQueue& operator=(BuilderQueue&&) =delete;
BuilderQueue(const BuilderQueue &) = delete;

BuilderQueue &operator=(const BuilderQueue &) = delete;

BuilderQueue(BuilderQueue &&) noexcept = delete;

BuilderQueue &operator=(BuilderQueue &&) = delete;


// Add the specified handler to the queue
Expand All @@ -46,6 +49,7 @@ class BuilderQueue {

// Create reserve builders as needed
void create_reserve_builders();

private:
asio::io_context &io_context;

Expand Down
7 changes: 3 additions & 4 deletions BuilderQueue/include/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ namespace websocket = beast::websocket;

class Connection : public std::enable_shared_from_this<Connection> {
public:
explicit Connection(tcp::socket socket, BuilderQueue& queue) : stream(std::move(socket)),
queue(queue)
{};
explicit Connection(tcp::socket socket, BuilderQueue &queue) : stream(std::move(socket)),
queue(queue) {};

~Connection() {
if(builder) {
if (builder) {
queue.return_builder(builder.get());
}
Logger::info("Connection ending");
Expand Down
4 changes: 2 additions & 2 deletions BuilderQueue/include/OpenStack.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ namespace pt = boost::property_tree;

class OpenStack : public std::enable_shared_from_this<OpenStack> {
public:
OpenStack(asio::io_context &io_context) : io_context(io_context),
output_pipe(io_context) {}
explicit OpenStack(asio::io_context &io_context) : io_context(io_context),
output_pipe(io_context) {}

// Request to create a new builder
// The handler must have the form void(std::error_code error, BuilderData builder)
Expand Down
13 changes: 8 additions & 5 deletions BuilderQueue/include/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@ using asio::ip::tcp;

class Server {
public:
Server(asio::io_context &io_context, BuilderQueue& queue) :
Server(asio::io_context &io_context, BuilderQueue &queue) :
acceptor(io_context, tcp::endpoint(tcp::v4(), 8080)),
queue(queue) {
accept_connection();
};

// Don't allow the connection server to be copied or moved
Server(const Server&) =delete;
Server& operator=(const Server&) =delete;
Server(Server&&) noexcept =delete;
Server& operator=(Server&&) =delete;
Server(const Server &) = delete;

Server &operator=(const Server &) = delete;

Server(Server &&) noexcept = delete;

Server &operator=(Server &&) = delete;

private:
tcp::acceptor acceptor;
Expand Down
32 changes: 17 additions & 15 deletions BuilderQueue/src/BuilderQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void BuilderQueue::return_builder(BuilderData builder) {

void BuilderQueue::process_pending_handler() {
Logger::info("Processing " + std::to_string(pending_handlers.size()) +
" handlers with " + std::to_string(reserve_builders.size()) + " reserve builders");
" handlers with " + std::to_string(reserve_builders.size()) + " reserve builders");

if (!pending_handlers.empty() && !reserve_builders.empty()) {
Logger::info("Processing pending handler");
Expand All @@ -39,15 +39,16 @@ void BuilderQueue::process_pending_handler() {
Logger::info("Providing builder to client: " + builder.id);

// Post the handler to pass the builder to the connection
asio::post(io_context, std::bind(handler,builder));
asio::post(io_context, std::bind(handler, builder));

// Attempt to create a new reserve builder if required
create_reserve_builders();
}
}

void BuilderQueue::create_reserve_builders() {
Logger::info("Checking reserve builder count with " + std::to_string(reserve_builders.size()) + " reserve builders");
Logger::info(
"Checking reserve builder count with " + std::to_string(reserve_builders.size()) + " reserve builders");

// Attempt to create the required number of reserve builders while staying below the total allowed builder count
auto potential_reserve_count = reserve_builders.size() + outstanding_create_count;
Expand All @@ -68,18 +69,19 @@ void BuilderQueue::create_reserve_builders() {
for (std::size_t i = 0; i < request_count; i++) {
Logger::info("Attempting to create builder " + std::to_string(i));

std::make_shared<OpenStack>(io_context)->request_create([this, i](std::error_code error, BuilderData builder) {
if (!error) {
outstanding_create_count--;
Logger::info("Created builder " + std::to_string(i) + ": " + builder.id);
reserve_builders.push_back(builder);
process_pending_handler();
} else {
Logger::error("Error creating builder, retrying in five seconds: " + std::to_string(i));
asio::deadline_timer timer(io_context, boost::posix_time::seconds(5));
timer.async_wait(std::bind(&BuilderQueue::create_reserve_builders, this));
}
});
std::make_shared<OpenStack>(io_context)->request_create(
[this, i](std::error_code error, BuilderData builder) {
if (!error) {
outstanding_create_count--;
Logger::info("Created builder " + std::to_string(i) + ": " + builder.id);
reserve_builders.push_back(builder);
process_pending_handler();
} else {
Logger::error("Error creating builder, retrying in five seconds: " + std::to_string(i));
asio::deadline_timer timer(io_context, boost::posix_time::seconds(5));
timer.async_wait(std::bind(&BuilderQueue::create_reserve_builders, this));
}
});
}
}
}
21 changes: 10 additions & 11 deletions BuilderQueue/src/Connection.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "Connection.h"
#include <boost/beast/core/buffers_to_string.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/core/ignore_unused.hpp>

using namespace std::placeholders;

Expand All @@ -10,10 +9,10 @@ void Connection::wait_for_close() {
auto self(shared_from_this());

Logger::info("Waiting for connection to close");
stream.async_read(buffer, [this, self] (beast::error_code error, std::size_t bytes) {
stream.async_read(buffer, [this, self](beast::error_code error, std::size_t bytes) {
boost::ignore_unused(bytes);

if(error == websocket::error::closed) {
if (error == websocket::error::closed) {
Logger::info("Cleaning closing connection");
} else {
Logger::error("Error closing connection: " + error.message());
Expand All @@ -34,9 +33,9 @@ void Connection::builder_ready(BuilderData builder) {
std::string request_string(serialized_builder);

Logger::info("Writing builder: " + builder.id);
stream.async_write(asio::buffer(request_string), [this, self, builder] (beast::error_code error, std::size_t bytes){
stream.async_write(asio::buffer(request_string), [this, self, builder](beast::error_code error, std::size_t bytes) {
boost::ignore_unused(bytes);
if(!error) {
if (!error) {
wait_for_close();
} else {
Logger::error("Error writing builder: " + builder.id);
Expand All @@ -51,7 +50,7 @@ void Connection::request_builder() {
Logger::info("Request to checkout builder made");
// Pass the provide_builder callback to the queue
// 'self' is passed to keep the connection alive as long as it's waiting for a builder in the queue
queue.checkout_builder([this, self] (BuilderData builder) {
queue.checkout_builder([this, self](BuilderData builder) {
builder_ready(builder);
});
}
Expand All @@ -61,12 +60,12 @@ void Connection::read_request_string() {
auto self(shared_from_this());

Logger::info("Reading initial request");
stream.async_read(buffer, [this, self] (beast::error_code error, std::size_t bytes) {
stream.async_read(buffer, [this, self](beast::error_code error, std::size_t bytes) {
boost::ignore_unused(bytes);
if(!error) {
if (!error) {
auto request = beast::buffers_to_string(buffer.data());
buffer.consume(buffer.size());
if(request == "checkout_builder_request") {
if (request == "checkout_builder_request") {
request_builder();
} else {
Logger::error("Bad initial request string: " + request);
Expand All @@ -83,8 +82,8 @@ void Connection::start() {

// Accept websocket handshake
Logger::info("waiting for client WebSocket handshake");
stream.async_accept([this, self](beast::error_code error){
if(!error) {
stream.async_accept([this, self](beast::error_code error) {
if (!error) {
Logger::info("Setting stream to binary mode");
stream.binary(true);
read_request_string();
Expand Down
4 changes: 2 additions & 2 deletions BuilderQueue/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ int main(int argc, char *argv[]) {
} catch (const boost::exception &ex) {
auto diagnostics = diagnostic_information(ex);
Logger::error(std::string() + "io_service exception encountered: " + diagnostics);
} catch (const std::exception& ex) {
} catch (const std::exception &ex) {
Logger::error(std::string() + "io_service exception encountered: " + ex.what());
} catch(...) {
} catch (...) {
Logger::error("Unknown io_service exception encountered");
}
}
Expand Down
Loading

0 comments on commit 70f6232

Please sign in to comment.