Skip to content

Commit

Permalink
Merge pull request #175 from acompany-develop/feature/nakata/clean_db…
Browse files Browse the repository at this point in the history
…client

clean dbclient
  • Loading branch information
mdonaka authored Apr 24, 2023
2 parents 4089f32 + 61133ba commit f596060
Show file tree
Hide file tree
Showing 19 changed files with 189 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,3 @@ cc_library(
],
visibility = ["//visibility:public"],
)

cc_library(
name = "jointable",
srcs = [
"join_table.cpp"
],
hdrs = [
"join_table.hpp",
],
deps = [
"@nlohmann_json//:json",
"@proto//manage_to_computation_container:manage_to_computation_cc_grpc",
"//share:share",
"//share:compare",
"//job:progressManager",
"//logging:log",
"//client/computation_to_db:client",
"//client/computation_to_db:valuetable",
],
visibility = ["//visibility:public"],
)

cc_library(
name = "valuetable",
srcs = [
"value_table.cpp"
],
hdrs = [
"value_table.hpp",
],
deps = [
"//logging:log",
"//client/computation_to_db:client",
],
visibility = ["//visibility:public"],
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
namespace qmpc::ComputationToDb
{

namespace fs = std::experimental::filesystem;

nlohmann::json convertSchemaToJson(const qmpc::ComputationToDb::SchemaType &src)
{
pb_common_types::Schema col_pb;
Expand Down Expand Up @@ -44,43 +42,35 @@ std::vector<nlohmann::json> convertSchemaVectorToJsonVector(
return dst;
}

/************ Client::ComputationResultWriter ************/
Client::ComputationResultWriter::ComputationResultWriter(
const std::string &job_uuid, int key, int column_number, int piece_size
/************ ComputationResultWriter ************/
ComputationResultWriter::ComputationResultWriter(
const std::string &job_uuid, int data_type, int column_number, int piece_size
)
: current_size(0)
, piece_id(0)
, job_uuid(job_uuid)
, data_name(
(key == 0) ? "dim1"
: (key == 1) ? "dim2"
: "schema"
)
, data_type(data_type)
// NOTE: Client側で復元する際に0以下だと不都合が生じるため
, column_number(std::max(1, column_number))
, piece_size(piece_size)
{
}

void Client::ComputationResultWriter::write()
void ComputationResultWriter::write()
{
nlohmann::json piece_data_json = {
{"job_uuid", job_uuid},
{"result", piece_data},
{"meta", {{"piece_id", piece_id}, {"column_number", column_number}}}};
const std::string data = piece_data_json.dump();

auto ofs =
std::ofstream(resultDbPath + job_uuid + "/" + data_name + "_" + std::to_string(piece_id));
ofs << data;
ofs.close();
Client::getInstance()->writeResultDB(job_uuid, data, data_type, piece_id);

++piece_id;
current_size = 0;
piece_data.clear();
}

void Client::ComputationResultWriter::emplace(const std::string &s)
void ComputationResultWriter::emplace(const std::string &s)
{
int size = s.size();
if (current_size + size >= piece_size)
Expand All @@ -90,18 +80,16 @@ void Client::ComputationResultWriter::emplace(const std::string &s)
piece_data.emplace_back(s);
current_size += size;
}
void Client::ComputationResultWriter::emplace(const std::vector<std::string> &v)
{
for (const auto &x : v)
{
emplace(x);
}
}
void Client::ComputationResultWriter::emplace(const SchemaType &s)
void ComputationResultWriter::emplace(const SchemaType &s)
{
auto json = convertSchemaToJson(s);
emplace(json.dump());
}
void ComputationResultWriter::completed()
{
write();
Client::getInstance()->updateJobCompleted(job_uuid);
}

/************ TableWriter ************/
TableWriter::TableWriter(const std::string &data_id, int piece_size)
Expand Down Expand Up @@ -200,7 +188,7 @@ std::optional<std::vector<std::vector<std::string>>> Client::readTable(
const std::string &data_id, int piece_id
) const
{
auto data_path = shareDbPath + data_id + "/" + std::to_string(piece_id);
auto data_path = shareDbPath / data_id / std::to_string(piece_id);
if (!fs::exists(data_path))
{
return std::nullopt;
Expand All @@ -223,7 +211,7 @@ std::optional<std::vector<std::vector<std::string>>> Client::readTable(
std::vector<SchemaType> Client::readSchema(const std::string &data_id) const
{
// DBから値を取り出す
auto ifs = std::ifstream(shareDbPath + data_id + "/0");
auto ifs = std::ifstream(shareDbPath / data_id / "0");
std::string data;
getline(ifs, data);
auto json = nlohmann::json::parse(data);
Expand All @@ -235,16 +223,29 @@ std::vector<SchemaType> Client::readSchema(const std::string &data_id) const
// shareDBに対してdataを書き込む
void Client::writeShareDB(const std::string &data_id, const std::string &data, int piece_id)
{
fs::create_directories(shareDbPath + data_id);
auto ofs = std::ofstream(fs::path(shareDbPath) / data_id / std::to_string(piece_id));
fs::create_directories(shareDbPath / data_id);
auto ofs = std::ofstream(shareDbPath / data_id / std::to_string(piece_id));
ofs << data;
ofs.close();
}

// resultDBに対してdataを書き込む
void Client::writeResultDB(
const std::string &job_uuid, const std::string &data, int data_type, int piece_id
)
{
fs::create_directories(resultDbPath / job_uuid);
std::string data_name = (data_type == 0) ? "dim1" : (data_type == 1) ? "dim2" : "schema";
auto data_file = data_name + "_" + std::to_string(piece_id);
auto ofs = std::ofstream(resultDbPath / job_uuid / data_file);
ofs << data;
ofs.close();
}

// Job を DB に新規登録する
void Client::registerJob(const std::string &job_uuid, const int &status) const
{
fs::create_directories(resultDbPath + job_uuid);
fs::create_directories(resultDbPath / job_uuid);
updateJobStatus(job_uuid, status);
}

Expand All @@ -254,25 +255,29 @@ void Client::updateJobStatus(const std::string &job_uuid, const int &status) con
const google::protobuf::EnumDescriptor *descriptor =
google::protobuf::GetEnumDescriptor<pb_common_types::JobStatus>();

std::ofstream ofs(
resultDbPath + job_uuid + "/status_" + descriptor->FindValueByNumber(status)->name()
);
auto status_file = "status_" + descriptor->FindValueByNumber(status)->name();
std::ofstream ofs(resultDbPath / job_uuid / status_file);

std::chrono::system_clock::time_point tp = std::chrono::system_clock::now();
auto tp_msec = std::chrono::duration_cast<std::chrono::milliseconds>(tp.time_since_epoch());
ofs << tp_msec.count();
}

// Job の完了を登録する
void Client::updateJobCompleted(const std::string &job_uuid) const
{
std::ofstream(resultDbPath / job_uuid / "completed");
}

void Client::saveErrorInfo(const std::string &job_uuid, const pb_common_types::JobErrorInfo &info)
const
{
const google::protobuf::EnumDescriptor *descriptor =
google::protobuf::GetEnumDescriptor<pb_common_types::JobStatus>();

std::ofstream ofs(
resultDbPath + job_uuid + "/status_"
+ descriptor->FindValueByNumber(pb_common_types::JobStatus::ERROR)->name()
);
auto status_file =
"status_" + descriptor->FindValueByNumber(pb_common_types::JobStatus::ERROR)->name();
std::ofstream ofs(resultDbPath / job_uuid / status_file);

static const auto options = []()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <experimental/filesystem>
#include <fstream>
#include <memory>
#include <optional>
Expand All @@ -13,35 +14,16 @@

namespace qmpc::ComputationToDb
{
namespace fs = std::experimental::filesystem;
using SchemaType = std::tuple<std::string, pb_common_types::ShareValueTypeEnum>;

std::vector<nlohmann::json>
convertSchemaVectorToJsonVector(const std::vector<qmpc::ComputationToDb::SchemaType> &);

class Client final
{
static inline const std::string shareDbPath = "/db/share/";
static inline const std::string resultDbPath = "/db/result/";

class ComputationResultWriter
{
int current_size;
int piece_id;
std::vector<std::string> piece_data;

const std::string job_uuid;
const std::string data_name;
const int column_number;
const int piece_size;

public:
ComputationResultWriter(const std::string &, int, int, int);
void write();

void emplace(const std::string &);
void emplace(const std::vector<std::string> &);
void emplace(const SchemaType &);
};
static inline const auto shareDbPath = fs::path("/db/share/");
static inline const auto resultDbPath = fs::path("/db/result/");

public:
Client();
Expand All @@ -53,40 +35,54 @@ class Client final

// shareDBに対してdataを書き込む
void writeShareDB(const std::string &data_id, const std::string &data, int piece_id = 0);
// resultDBに対してdataを書き込む
void writeResultDB(
const std::string &job_uuid, const std::string &data, int data_type, int piece_id = 0
);

// Job を DB に新規登録する
void registerJob(const std::string &job_uuid, const int &status) const;

// Job の実行状態を更新する
void updateJobStatus(const std::string &job_uuid, const int &status) const;

// Job の完了を登録する
void updateJobCompleted(const std::string &job_uuid) const;

// Job 実行中に発生したエラーに関する情報を保存する
void saveErrorInfo(const std::string &job_uuid, const pb_common_types::JobErrorInfo &info)
const;
};

// 計算結果のWriter
class ComputationResultWriter
{
int current_size;
int piece_id;
std::vector<std::string> piece_data;

const std::string job_uuid;
const int data_type;
const int column_number;
const int piece_size;

void write();

// 保存時にデフォルトで呼ばれる恒等関数
static inline auto identity = [](const auto &t) { return t; };
public:
ComputationResultWriter(const std::string &, int, int, int piece_size = 1000000);

// resultの保存
// NOTE: result_listにbegin()とend()が実装されている必要がある
template <class T, class F = decltype(identity)>
void writeComputationResult(
const std::string &job_uuid,
const T &result_list,
int data_type, // 0:dim1, 1:dim2, 2:schema
int column_number,
const F &f = identity, // 保存時にitrごとに加工したい場合に指定する
int piece_size = 1000000
) const
template <class Iteratable>
void emplace(const Iteratable &v)
{
auto writer = ComputationResultWriter(job_uuid, data_type, column_number, piece_size);
for (const auto &x : result_list)
for (const auto &x : v)
{
writer.emplace(f(x));
emplace(x);
}
writer.write();
std::ofstream(resultDbPath + job_uuid + "/completed");
}
void emplace(const std::string &);
void emplace(const SchemaType &);
void completed();
};

// TableWriterとComputationResultWriterは本質的には同じことをしているが,
Expand Down
2 changes: 1 addition & 1 deletion packages/server/computation_container/job/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ cc_library(
"//computation:computation",
"//share:share",
"//client/computation_to_db:client",
"//client/computation_to_db:jointable",
"//value_table:jointable",
"@proto//manage_to_computation_container:manage_to_computation_cc_grpc",
"//logging:log"
],
Expand Down
2 changes: 1 addition & 1 deletion packages/server/computation_container/job/job_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
#include <boost/range/adaptor/indexed.hpp>
#include <boost/range/adaptor/transformed.hpp>

#include "client/computation_to_db/join_table.hpp"
#include "external/proto/manage_to_computation_container/manage_to_computation.grpc.pb.h"
#include "job_parameter.hpp"
#include "job_status.hpp"
#include "logging/logger.hpp"
#include "share/share.hpp"
#include "value_table/join_table.hpp"

namespace qmpc::Job
{
Expand Down
6 changes: 4 additions & 2 deletions packages/server/computation_container/job/jobs/correl_job.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ class CorrelJob : public JobBase<CorrelJob>

auto results = toString(ret);
auto column_number = (results.empty() ? -1 : results[0].size());
auto db_client = qmpc::ComputationToDb::Client::getInstance();
db_client->writeComputationResult(job_uuid, results, 1, column_number);
auto computationResultWriter =
qmpc::ComputationToDb::ComputationResultWriter(job_uuid, 1, column_number);
computationResultWriter.emplace(results);
computationResultWriter.completed();
}
};
} // namespace qmpc::Job
20 changes: 14 additions & 6 deletions packages/server/computation_container/job/jobs/join_table_job.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,20 @@ class JoinTableJob : public JobBase<JoinTableJob>
auto db_client = qmpc::ComputationToDb::Client::getInstance();
auto column_number = new_schemas.size();

// argを束縛した関数を生成する
auto f = std::bind(removeIdColumn<std::vector<std::string>>, arg, std::placeholders::_1);
// tableの保存
db_client->writeComputationResult(job_uuid, table, 1, column_number, f);
// schemaの保存
db_client->writeComputationResult(job_uuid, new_schemas, 2, column_number);
// ID列を削除してTableを保存
auto computationResultWriterTable =
qmpc::ComputationToDb::ComputationResultWriter(job_uuid, 1, column_number);
for (const auto &row : table)
{
computationResultWriterTable.emplace(removeIdColumn(arg, row));
}
computationResultWriterTable.completed();

// Schemaを保存
auto computationResultWriterSchema =
qmpc::ComputationToDb::ComputationResultWriter(job_uuid, 2, column_number);
computationResultWriterSchema.emplace(new_schemas);
computationResultWriterSchema.completed();
}
};
} // namespace qmpc::Job
Loading

0 comments on commit f596060

Please sign in to comment.