Skip to content

Commit

Permalink
Move DumpRestore functionality to TableArchiver
Browse files Browse the repository at this point in the history
Separates dump/restore from catalog, preventing catalog and parser from depending on each other for linking.
  • Loading branch information
alexbaden authored and andrewseidl committed Mar 20, 2020
1 parent b7c4755 commit d870ae6
Show file tree
Hide file tree
Showing 9 changed files with 317 additions and 274 deletions.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ add_subdirectory(Shared)
add_subdirectory(Utils)
add_subdirectory(QueryRunner)
add_subdirectory(SQLFrontend)
add_subdirectory(TableArchiver)
add_subdirectory(ThriftHandler)

add_subdirectory(Distributed)
Expand All @@ -496,7 +497,7 @@ if(ENABLE_ODBC)
add_subdirectory(ODBC)
endif()

set(MAPD_LIBRARIES Shared Catalog SqliteConnector MigrationMgr Parser Analyzer CsvImport QueryRunner QueryEngine QueryState LockMgr DataMgr Fragmenter Chunk)
set(MAPD_LIBRARIES Shared Catalog SqliteConnector MigrationMgr TableArchiver Parser Analyzer CsvImport QueryRunner QueryEngine QueryState LockMgr DataMgr Fragmenter Chunk)

if("${MAPD_EDITION_LOWER}" STREQUAL "ee")
list(APPEND MAPD_LIBRARIES Distributed)
Expand Down
1 change: 0 additions & 1 deletion Catalog/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ set(catalog_source_files
Catalog.cpp
Catalog.h
DBObject.cpp
DumpRestore.cpp
Grantee.cpp
Grantee.h
SessionInfo.cpp
Expand Down
133 changes: 133 additions & 0 deletions Catalog/Catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
#include "QueryEngine/Execute.h"
#include "QueryEngine/TableOptimizer.h"

#include "DataMgr/FileMgr/FileMgr.h"
#include "DataMgr/FileMgr/GlobalFileMgr.h"
#include "DataMgr/ForeignStorage/ForeignStorageInterface.h"
#include "Fragmenter/Fragmenter.h"
#include "Fragmenter/SortedOrderFragmenter.h"
Expand Down Expand Up @@ -3505,4 +3507,135 @@ void Catalog::setForReload(const int32_t tableId) {
setTableEpoch(currentDB_.dbId, shard->tableId, tableEpoch);
}
}

// get a table's data dirs
std::vector<std::string> Catalog::getTableDataDirectories(
const TableDescriptor* td) const {
const auto global_file_mgr = getDataMgr().getGlobalFileMgr();
std::vector<std::string> file_paths;
for (auto shard : getPhysicalTablesDescriptors(td)) {
const auto file_mgr = dynamic_cast<File_Namespace::FileMgr*>(
global_file_mgr->getFileMgr(currentDB_.dbId, shard->tableId));
boost::filesystem::path file_path(file_mgr->getFileMgrBasePath());
file_paths.push_back(file_path.filename().string());
}
return file_paths;
}

// get a column's dict dir basename
std::string Catalog::getColumnDictDirectory(const ColumnDescriptor* cd) const {
if ((cd->columnType.is_string() || cd->columnType.is_string_array()) &&
cd->columnType.get_compression() == kENCODING_DICT &&
cd->columnType.get_comp_param() > 0) {
const auto dictId = cd->columnType.get_comp_param();
const DictRef dictRef(currentDB_.dbId, dictId);
const auto dit = dictDescriptorMapByRef_.find(dictRef);
CHECK(dit != dictDescriptorMapByRef_.end());
CHECK(dit->second);
boost::filesystem::path file_path(dit->second->dictFolderPath);
return file_path.filename().string();
}
return std::string();
}

// get a table's dict dirs
std::vector<std::string> Catalog::getTableDictDirectories(
const TableDescriptor* td) const {
std::vector<std::string> file_paths;
for (auto cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
auto file_base = getColumnDictDirectory(cd);
if (!file_base.empty() &&
file_paths.end() == std::find(file_paths.begin(), file_paths.end(), file_base)) {
file_paths.push_back(file_base);
}
}
return file_paths;
}

// returns table schema in a string
std::string Catalog::dumpSchema(const TableDescriptor* td) const {
cat_read_lock read_lock(this);

std::ostringstream os;
os << "CREATE TABLE @T (";
// gather column defines
const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
std::string comma;
std::vector<std::string> shared_dicts;
std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
for (const auto cd : cds) {
if (!(cd->isSystemCol || cd->isVirtualCol)) {
const auto& ti = cd->columnType;
os << comma << cd->columnName;
// CHAR is perculiar... better dump it as TEXT(32) like \d does
if (ti.get_type() == SQLTypes::kCHAR) {
os << " "
<< "TEXT";
} else if (ti.get_subtype() == SQLTypes::kCHAR) {
os << " "
<< "TEXT[]";
} else {
os << " " << ti.get_type_name();
}
os << (ti.get_notnull() ? " NOT NULL" : "");
if (ti.is_string()) {
if (ti.get_compression() == kENCODING_DICT) {
// if foreign reference, get referenced tab.col
const auto dict_id = ti.get_comp_param();
const DictRef dict_ref(currentDB_.dbId, dict_id);
const auto dict_it = dictDescriptorMapByRef_.find(dict_ref);
CHECK(dict_it != dictDescriptorMapByRef_.end());
const auto dict_name = dict_it->second->dictName;
// when migrating a table, any foreign dict ref will be dropped
// and the first cd of a dict will become root of the dict
if (dict_root_cds.end() == dict_root_cds.find(dict_name)) {
dict_root_cds[dict_name] = cd;
os << " ENCODING " << ti.get_compression_name() << "(" << (ti.get_size() * 8)
<< ")";
} else {
const auto dict_root_cd = dict_root_cds[dict_name];
shared_dicts.push_back("SHARED DICTIONARY (" + cd->columnName +
") REFERENCES @T(" + dict_root_cd->columnName + ")");
// "... shouldn't specify an encoding, it borrows from the referenced column"
}
} else {
os << " ENCODING NONE";
}
} else if (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size()) {
const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
os << " ENCODING " << ti.get_compression_name() << "(" << comp_param << ")";
}
comma = ", ";
}
}
// gather SHARED DICTIONARYs
if (shared_dicts.size()) {
os << ", " << boost::algorithm::join(shared_dicts, ", ");
}
// gather WITH options ...
std::vector<std::string> with_options;
with_options.push_back("FRAGMENT_SIZE=" + std::to_string(td->maxFragRows));
with_options.push_back("MAX_CHUNK_SIZE=" + std::to_string(td->maxChunkSize));
with_options.push_back("PAGE_SIZE=" + std::to_string(td->fragPageSize));
with_options.push_back("MAX_ROWS=" + std::to_string(td->maxRows));
with_options.emplace_back(td->hasDeletedCol ? "VACUUM='DELAYED'"
: "VACUUM='IMMEDIATE'");
if (!td->partitions.empty()) {
with_options.push_back("PARTITIONS='" + td->partitions + "'");
}
if (td->nShards > 0) {
const auto shard_cd = getMetadataForColumn(td->tableId, td->shardedColumnId);
CHECK(shard_cd);
os << ", SHARD KEY(" << shard_cd->columnName << ")";
with_options.push_back("SHARD_COUNT=" + std::to_string(td->nShards));
}
if (td->sortedColumnId > 0) {
const auto sort_cd = getMetadataForColumn(td->tableId, td->sortedColumnId);
CHECK(sort_cd);
with_options.push_back("SORT_COLUMN='" + sort_cd->columnName + "'");
}
os << ") WITH (" + boost::algorithm::join(with_options, ", ") + ");";
return os.str();
}

} // namespace Catalog_Namespace
15 changes: 3 additions & 12 deletions Catalog/Catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class SharedDictionaryDef;

} // namespace Parser

class TableArchiver;

// SPI means Sequential Positional Index which is equivalent to the input index in a
// RexInput node
#define SPIMAP_MAGIC1 (std::numeric_limits<unsigned>::max() / 4)
Expand Down Expand Up @@ -224,18 +226,7 @@ class Catalog final {
void vacuumDeletedRows(const TableDescriptor* td) const;
void vacuumDeletedRows(const int logicalTableId) const;
void setForReload(const int32_t tableId);
// dump & restore
void dumpTable(const TableDescriptor* td,
const std::string& path,
const std::string& compression) const;
void restoreTable(const SessionInfo& session,
const TableDescriptor* td,
const std::string& file_path,
const std::string& compression);
void restoreTable(const SessionInfo& session,
const std::string& table_name,
const std::string& file_path,
const std::string& compression);

std::vector<std::string> getTableDataDirectories(const TableDescriptor* td) const;
std::vector<std::string> getTableDictDirectories(const TableDescriptor* td) const;
std::string getColumnDictDirectory(const ColumnDescriptor* cd) const;
Expand Down
2 changes: 1 addition & 1 deletion Parser/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,5 @@ add_dependencies(PatchScanner ScannerFiles)

add_dependencies(Parser PatchParser PatchScanner)

target_link_libraries(Parser ParserGenerated QueryState CsvImport Shared Analyzer QueryEngine Catalog DataMgr LockMgr)
target_link_libraries(Parser ParserGenerated QueryState CsvImport Shared Analyzer QueryEngine Catalog TableArchiver DataMgr LockMgr)
include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
7 changes: 5 additions & 2 deletions Parser/ParserNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
#include "Shared/mapd_glob.h"
#include "Shared/measure.h"
#include "Shared/shard_key.h"
#include "TableArchiver/TableArchiver.h"
#include "gen-cpp/CalciteServer.h"
#include "parser.h"

Expand Down Expand Up @@ -4453,7 +4454,8 @@ void DumpTableStmt::execute(const Catalog_Namespace::SessionInfo& session) {
}
auto& catalog = session.getCatalog();
const TableDescriptor* td = catalog.getMetadataForTable(*table);
catalog.dumpTable(td, *path, compression);
TableArchiver table_archiver(&catalog);
table_archiver.dumpTable(td, *path, compression);
}

void RestoreTableStmt::execute(const Catalog_Namespace::SessionInfo& session) {
Expand All @@ -4471,7 +4473,8 @@ void RestoreTableStmt::execute(const Catalog_Namespace::SessionInfo& session) {
throw std::runtime_error("Table " + *table +
" will not be restored. User has no create privileges.");
}
catalog.restoreTable(session, *table, *path, compression);
TableArchiver table_archiver(&catalog);
table_archiver.restoreTable(session, *table, *path, compression);
}
}

Expand Down
8 changes: 8 additions & 0 deletions TableArchiver/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
set(table_archive_source_files
TableArchiver.cpp
)

add_library(TableArchiver ${table_archive_source_files})

target_link_libraries(TableArchiver Catalog Parser Shared)

Loading

0 comments on commit d870ae6

Please sign in to comment.