Skip to content

Commit

Permalink
Import sample geo tables automatically with initdb
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbaden authored and andrewseidl committed May 15, 2018
1 parent fb8cf86 commit 85ec451
Show file tree
Hide file tree
Showing 26 changed files with 514 additions and 327 deletions.
24 changes: 11 additions & 13 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ else()
include_directories(Distributed/os)
endif()

set(MAPD_RENDERING_LIBRARIES "")
if(NOT "${MAPD_EDITION_LOWER}" STREQUAL "os")
option(ENABLE_RENDERING "Build backend renderer" OFF)
if(ENABLE_RENDERING)
Expand Down Expand Up @@ -367,6 +368,7 @@ add_subdirectory(Fragmenter)
add_subdirectory(Chunk)
add_subdirectory(Shared)
add_subdirectory(Utils)
add_subdirectory(QueryRunner)
add_subdirectory(SQLFrontend)
add_subdirectory(ThriftHandler)

Expand All @@ -377,25 +379,24 @@ endif()

add_subdirectory(Calcite)

set(MAPD_LIBRARIES Shared Catalog SqliteConnector Parser Analyzer Planner
QueryEngine DataMgr Fragmenter Chunk)
set(MAPD_LIBRARIES Shared Catalog SqliteConnector Parser Analyzer Planner CsvImport QueryRunner QueryEngine DataMgr Fragmenter Chunk)

if("${MAPD_EDITION_LOWER}" STREQUAL "ee")
list(APPEND MAPD_LIBRARIES Distributed)
endif()

if(ENABLE_FOLLY)
list(APPEND MAPD_LIBRARIES ${Folly_LIBRARIES})
endif()

list(APPEND MAPD_LIBRARIES Calcite)

list(APPEND MAPD_LIBRARIES ${Arrow_LIBRARIES})

if(ENABLE_RENDERING)
list(APPEND MAPD_LIBRARIES "${MAPD_RENDERING_LIBRARIES}")
endif()

list(APPEND MAPD_LIBRARIES ${Arrow_LIBRARIES})

if(ENABLE_FOLLY)
list(APPEND MAPD_LIBRARIES ${Folly_LIBRARIES})
endif()

if(ENABLE_LICENSING_AWS)
list(APPEND MAPD_LIBRARIES AWSMarketplace)
endif()
Expand Down Expand Up @@ -459,11 +460,8 @@ add_custom_target(rerun_cmake ALL
add_dependencies(mapd_server rerun_cmake)

target_link_libraries(mapd_server mapd_thrift thrift_handler ${MAPD_LIBRARIES} ${Boost_LIBRARIES} ${Glog_LIBRARIES} ${CMAKE_DL_LIBS} ${CUDA_LIBRARIES} ${LLVM_LINKER_FLAGS} ${PROFILER_LIBS} ${CURSES_LIBRARIES} ${ZLIB_LIBRARIES})
if(ENABLE_FOLLY)
target_link_libraries(initdb Catalog Fragmenter Shared Chunk SqliteConnector DataMgr StringDictionary ${Boost_LIBRARIES} ${ZLIB_LIBRARIES} ${Folly_LIBRARIES} ${CMAKE_DL_LIBS})
else()
target_link_libraries(initdb Catalog Fragmenter Shared Chunk SqliteConnector DataMgr StringDictionary ${Boost_LIBRARIES} ${ZLIB_LIBRARIES} ${CMAKE_DL_LIBS})
endif()

target_link_libraries(initdb ${MAPD_LIBRARIES} ${Boost_LIBRARIES} ${Glog_LIBRARIES} ${CMAKE_DL_LIBS} ${CUDA_LIBRARIES} ${LLVM_LINKER_FLAGS} ${CURSES_LIBRARIES} ${ZLIB_LIBRARIES})

macro(set_dpkg_arch arch_in arch_out)
if("${arch_in}" STREQUAL "x86_64")
Expand Down
3 changes: 3 additions & 0 deletions Import/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ target_link_libraries(CsvImport mapd_thrift Shared Catalog Chunk DataMgr StringD
install(DIRECTORY ${CMAKE_SOURCE_DIR}/ThirdParty/gdal-data DESTINATION "ThirdParty")
add_custom_target(gdal-data ALL COMMAND ${CMAKE_COMMAND} -E copy_directory "${CMAKE_SOURCE_DIR}/ThirdParty/gdal-data" "${CMAKE_BINARY_DIR}/ThirdParty/gdal-data")

install(DIRECTORY ${CMAKE_SOURCE_DIR}/ThirdParty/geo_samples DESTINATION "ThirdParty")
add_custom_target(geo_samples ALL COMMAND ${CMAKE_COMMAND} -E copy_directory "${CMAKE_SOURCE_DIR}/ThirdParty/geo_samples" "${CMAKE_BINARY_DIR}/ThirdParty/geo_samples")

add_library(RowToColumn RowToColumnLoader.cpp RowToColumnLoader.h)
add_executable(StreamImporter StreamImporter.cpp)
target_link_libraries(StreamImporter RowToColumn mapd_thrift Shared ${Glog_LIBRARIES} ${CMAKE_DL_LIBS} ${Boost_LIBRARIES})
Expand Down
74 changes: 74 additions & 0 deletions Import/Importer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@
#include "../Shared/geosupport.h"
#include "../Shared/mapd_glob.h"
#include "../Shared/scope.h"
#include "../Shared/import_helpers.h"

#include "Importer.h"
#include "DataMgr/LockMgr.h"
#include "QueryRunner/QueryRunner.h"
#include "Utils/ChunkAccessorTable.h"
#include "gen-cpp/MapD.h"
#include <vector>
Expand Down Expand Up @@ -3915,4 +3918,75 @@ int RenderGroupAnalyzer::insertCoordsAndReturnRenderGroup(const std::vector<doub
return firstAvailableRenderGroup;
}

void ImportDriver::import_geo_table(const std::string& file_path, const std::string& table_name) {
const std::string geo_column_name(MAPD_GEO_PREFIX);

CopyParams copy_params;
// Import as 32-bit GEOINT encoding by default
copy_params.geo_coords_encoding = EncodingType::kENCODING_GEOINT;
copy_params.geo_coords_comp_param = 32;

const auto cds = Importer::gdalToColumnDescriptors(file_path, geo_column_name, copy_params);
std::map<std::string, std::string> colname_to_src;
for (auto cd : cds) {
const auto col_name_sanitized = ImportHelpers::sanitize_name(cd.columnName);
const auto ret = colname_to_src.insert(std::make_pair(cd.columnName, col_name_sanitized));
CHECK(ret.second);
cd.columnName = col_name_sanitized;
}

auto& cat = session_->get_catalog();
const auto td = cat.getMetadataForTable(table_name);
if (td != nullptr) {
throw std::runtime_error("Error: Table " + table_name +
" already exists. Possible failure to correctly re-create mapd_data directory.");
}
if (table_name != ImportHelpers::sanitize_name(table_name)) {
throw std::runtime_error("Invalid characters in table name: " + table_name);
}

std::string stmt{"CREATE TABLE " + table_name};
std::vector<std::string> col_stmts;

for (auto col : cds) {
if (col.columnType.get_type() == SQLTypes::kINTERVAL_DAY_TIME ||
col.columnType.get_type() == SQLTypes::kINTERVAL_YEAR_MONTH) {
throw std::runtime_error("Unsupported type: INTERVAL_DAY_TIME or INTERVAL_YEAR_MONTH for col " + col.columnName +
" (table: " + table_name + ")");
}

if (col.columnType.get_type() == SQLTypes::kDECIMAL) {
if (col.columnType.get_precision() == 0 && col.columnType.get_scale() == 0) {
col.columnType.set_precision(14);
col.columnType.set_scale(7);
}
}

std::string col_stmt;
col_stmt.append(col.columnName + " " + col.columnType.get_type_name() + " ");

if (col.columnType.get_compression() != EncodingType::kENCODING_NONE) {
col_stmt.append("ENCODING " + col.columnType.get_compression_name() + " ");
} else {
if (col.columnType.is_string()) {
col_stmt.append("ENCODING NONE");
} else if (col.columnType.is_geometry()) {
if (col.columnType.get_output_srid() == 4326) {
col_stmt.append("ENCODING NONE");
}
}
}
col_stmts.push_back(col_stmt);
}

stmt.append(" (" + boost::algorithm::join(col_stmts, ",") + ");");
QueryRunner::run_ddl_statement(stmt, session_);

LOG(INFO) << "Created table: " << table_name;
const auto new_td = cat.getMetadataForTable(table_name);
Importer_NS::Importer importer(cat, new_td, file_path, copy_params);
auto ms = measure<>::execution([&]() { importer.importGDAL(colname_to_src); });
LOG(INFO) << "Import Time for " << table_name << ": " << (double)ms / 1000.0 << " s";
}

} // Namespace Importer
15 changes: 15 additions & 0 deletions Import/Importer.h
Original file line number Diff line number Diff line change
Expand Up @@ -772,5 +772,20 @@ class Importer : public DataStreamSink {
std::unique_ptr<Loader> loader;
std::unique_ptr<bool[]> is_array_a;
};

class ImportDriver {
public:
ImportDriver(std::shared_ptr<Catalog_Namespace::Catalog> c,
const Catalog_Namespace::UserMetadata& user,
const ExecutorDeviceType t = ExecutorDeviceType::GPU)
: session_(new Catalog_Namespace::SessionInfo(c, user, t, "")) {}

void import_geo_table(const std::string& file_path, const std::string& table_name);

private:
std::unique_ptr<Catalog_Namespace::SessionInfo> session_;
};

} // namespace Importer_NS

#endif // _IMPORTER_H_
2 changes: 1 addition & 1 deletion Parser/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ set(parser_source_files

add_library(Parser ${BisonppOutput} ${FlexppOutput} ${CMAKE_CURRENT_BINARY_DIR}/parser.h ${parser_source_files})
add_dependencies(Parser ParserFiles ScannerFiles)
target_link_libraries(Parser Shared CsvImport QueryEngine)
target_link_libraries(Parser Shared QueryEngine)
include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
7 changes: 1 addition & 6 deletions QueryEngine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,7 @@ endif()

set(ARROW_LIBS ${Arrow_LIBRARIES})

if(ENABLE_RENDERING)
target_link_libraries(QueryEngine CsvImport Planner StringDictionary Utils QueryRenderer ${ARROW_LIBS})
else()
target_link_libraries(QueryEngine CsvImport Planner StringDictionary Utils ${ARROW_LIBS})
endif()

target_link_libraries(QueryEngine Planner StringDictionary Utils ${ARROW_LIBS})

add_custom_command(
DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/cuda_mapd_rt.o
Expand Down
6 changes: 6 additions & 0 deletions QueryRunner/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
set(query_runner_source_files
QueryRunner.cpp
)

add_library(QueryRunner ${query_runner_source_files})
target_link_libraries(QueryRunner ${Boost_LIBRARIES})
103 changes: 61 additions & 42 deletions Tests/QueryRunner.cpp → QueryRunner/QueryRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,19 @@

#include "QueryRunner.h"

#include "../Parser/parser.h"
#include "../QueryEngine/CalciteAdapter.h"
#include "../Parser/ParserWrapper.h"
#include "../Calcite/Calcite.h"
#include "Parser/parser.h"
#include "QueryEngine/CalciteAdapter.h"
#include "Parser/ParserWrapper.h"
#include "Calcite/Calcite.h"
#include "Catalog/Catalog.h"

#include "../QueryEngine/ExtensionFunctionsWhitelist.h"
#include "../QueryEngine/RelAlgExecutor.h"
#include "QueryEngine/ExtensionFunctionsWhitelist.h"
#include "QueryEngine/RelAlgExecutor.h"

#include <boost/filesystem/operations.hpp>

#define CALCITEPORT 39093

Catalog_Namespace::SessionInfo* get_session(const char* db_path) {
std::string db_name{MAPD_SYSTEM_DB};
std::string user_name{"mapd"};
std::string passwd{"HyperInteractive"};
boost::filesystem::path base_path{db_path};
CHECK(boost::filesystem::exists(base_path));
auto system_db_file = base_path / "mapd_catalogs" / "mapd";
CHECK(boost::filesystem::exists(system_db_file));
auto data_dir = base_path / "mapd_data";
Catalog_Namespace::UserMetadata user;
Catalog_Namespace::DBMetadata db;
auto calcite = std::make_shared<Calcite>(-1, CALCITEPORT, db_path, 1024);
ExtensionFunctionsWhitelist::add(calcite->getExtensionFunctionWhitelist());
#ifdef HAVE_CUDA
bool useGpus = true;
#else
bool useGpus = false;
#endif
{
auto dataMgr = std::make_shared<Data_Namespace::DataMgr>(data_dir.string(), 0, useGpus, -1);
auto& sys_cat = Catalog_Namespace::SysCatalog::instance();
sys_cat.init(base_path.string(), dataMgr, {}, calcite, false, false);
CHECK(sys_cat.getMetadataForUser(user_name, user));
CHECK_EQ(user.passwd, passwd);
CHECK(sys_cat.getMetadataForDB(db_name, db));
CHECK(user.isSuper || (user.userId == db.dbOwner));
}
auto dataMgr = std::make_shared<Data_Namespace::DataMgr>(data_dir.string(), 0, useGpus, -1);
return new Catalog_Namespace::SessionInfo(std::make_shared<Catalog_Namespace::Catalog>(
base_path.string(), db, dataMgr, std::vector<LeafHostInfo>{}, calcite),
user,
ExecutorDeviceType::GPU,
"");
}

namespace {

Planner::RootPlan* parse_plan_legacy(const std::string& query_str,
Expand Down Expand Up @@ -110,6 +76,43 @@ Planner::RootPlan* parse_plan(const std::string& query_str,

} // namespace

namespace QueryRunner {

Catalog_Namespace::SessionInfo* get_session(const char* db_path) {
std::string db_name{MAPD_SYSTEM_DB};
std::string user_name{"mapd"};
std::string passwd{"HyperInteractive"};
boost::filesystem::path base_path{db_path};
CHECK(boost::filesystem::exists(base_path));
auto system_db_file = base_path / "mapd_catalogs" / "mapd";
CHECK(boost::filesystem::exists(system_db_file));
auto data_dir = base_path / "mapd_data";
Catalog_Namespace::UserMetadata user;
Catalog_Namespace::DBMetadata db;
auto calcite = std::make_shared<Calcite>(-1, CALCITEPORT, db_path, 1024);
ExtensionFunctionsWhitelist::add(calcite->getExtensionFunctionWhitelist());
#ifdef HAVE_CUDA
bool useGpus = true;
#else
bool useGpus = false;
#endif
{
auto dataMgr = std::make_shared<Data_Namespace::DataMgr>(data_dir.string(), 0, useGpus, -1);
auto& sys_cat = Catalog_Namespace::SysCatalog::instance();
sys_cat.init(base_path.string(), dataMgr, {}, calcite, false, false);
CHECK(sys_cat.getMetadataForUser(user_name, user));
CHECK_EQ(user.passwd, passwd);
CHECK(sys_cat.getMetadataForDB(db_name, db));
CHECK(user.isSuper || (user.userId == db.dbOwner));
}
auto dataMgr = std::make_shared<Data_Namespace::DataMgr>(data_dir.string(), 0, useGpus, -1);
return new Catalog_Namespace::SessionInfo(std::make_shared<Catalog_Namespace::Catalog>(
base_path.string(), db, dataMgr, std::vector<LeafHostInfo>{}, calcite),
user,
ExecutorDeviceType::GPU,
"");
}

ExecutionResult run_select_query(const std::string& query_str,
const std::unique_ptr<Catalog_Namespace::SessionInfo>& session,
const ExecutorDeviceType device_type,
Expand All @@ -131,7 +134,7 @@ std::shared_ptr<ResultSet> run_multiple_agg(const std::string& query_str,
const bool hoist_literals,
const bool allow_loop_joins) {
ParserWrapper pw{query_str};
if( is_calcite_path_permissable( pw ) ) {
if (is_calcite_path_permissable(pw)) {
const auto execution_result = run_select_query(query_str, session, device_type, hoist_literals, allow_loop_joins);
return execution_result.getRows();
}
Expand All @@ -150,3 +153,19 @@ std::shared_ptr<ResultSet> run_multiple_agg(const std::string& query_str,
plan, *session, hoist_literals, device_type, ExecutorOptLevel::LoopStrengthReduction, false, allow_loop_joins);
#endif
}

void run_ddl_statement(const std::string& create_table_stmt,
const std::unique_ptr<Catalog_Namespace::SessionInfo>& session) {
SQLParser parser;
std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
std::string last_parsed;
CHECK_EQ(parser.parse(create_table_stmt, parse_trees, last_parsed), 0);
CHECK_EQ(parse_trees.size(), size_t(1));
auto stmt = parse_trees.front().get();
Parser::DDLStmt* ddl = dynamic_cast<Parser::DDLStmt*>(stmt);
CHECK(ddl);
if (ddl != nullptr)
ddl->execute(*session);
}

} // namespace QueryRUnner
18 changes: 15 additions & 3 deletions Tests/QueryRunner.h → QueryRunner/QueryRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@
#ifndef QUERY_RUNNER_H
#define QUERY_RUNNER_H

#include "../Catalog/Catalog.h"
#include "../QueryEngine/Execute.h"
#include "../QueryEngine/RelAlgExecutionDescriptor.h"
#include "../QueryEngine/CompilationOptions.h"

#include <memory>
#include <string>

namespace Catalog_Namespace {
class SessionInfo;
} // namespace Catalog_Namespace

class ResultSet;
class ExecutionResult;

namespace QueryRunner {

Catalog_Namespace::SessionInfo* get_session(const char* db_path);

ExecutionResult run_select_query(const std::string& query_str,
Expand All @@ -38,4 +45,9 @@ std::shared_ptr<ResultSet> run_multiple_agg(const std::string& query_str,
const bool hoist_literals,
const bool allow_loop_joins);

void run_ddl_statement(const std::string& create_table_stmt,
const std::unique_ptr<Catalog_Namespace::SessionInfo>& session);

} // namespace QueryRunner

#endif // QUERY_RUNNER_H
Loading

0 comments on commit 85ec451

Please sign in to comment.