Skip to content

Commit

Permalink
Support TextEncodingNone column arguments in table functions. (#7242)
Browse files Browse the repository at this point in the history
* Support TextEncodingNone column arguments in table functions.

* Implement .size()/.data() methods for struct Array<T>. Introduce operator() to return RowStruct. Fix tests.

* Replace arr.getSize() with arr.size()

* Introduce uses_flatbuffer member to SQLTypeInfo.

* Convert all UDTF inputs to flatbuffer when supported

* Refactor writeBackCell.

* Revise FlatBuffer memory layout management.

Signed-off-by: Misiu Godfrey <misiu.godfrey@kraken.mapd.com>
  • Loading branch information
pearu authored and misiugodfrey committed Aug 28, 2023
1 parent 06663e3 commit 50f7afa
Show file tree
Hide file tree
Showing 43 changed files with 1,787 additions and 855 deletions.
1 change: 1 addition & 0 deletions QueryEngine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ set(query_engine_source_files
StringDictionaryGenerations.cpp
TableFunctions/TestFunctions/ArrayTestTableFunctions.cpp
TableFunctions/TestFunctions/FilterPushdownTestTableFunctions.cpp
TableFunctions/TestFunctions/FlatBufferTableFunctions.cpp
TableFunctions/TestFunctions/GeoTestTableFunctions.cpp
TableFunctions/TestFunctions/OtherTestTableFunctions.cpp
TableFunctions/TestFunctions/RbcTestTableFunctions.cpp
Expand Down
31 changes: 25 additions & 6 deletions QueryEngine/ColumnFetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,26 @@ inline const ColumnarResults* columnarize_result(
const ResultSetPtr& result,
const size_t thread_idx,
const size_t executor_id,
const int frag_id) {
const int frag_id,
const bool convert_to_flatbuffer = false) {
INJECT_TIMER(columnarize_result);
CHECK_EQ(0, frag_id);

std::vector<SQLTypeInfo> col_types;
for (size_t i = 0; i < result->colCount(); ++i) {
col_types.push_back(get_logical_type_info(result->getColType(i)));
const auto& src_ti = result->getColType(i);
CHECK_EQ(result->checkSlotUsesFlatBufferFormat(i), src_ti.usesFlatBuffer());
auto ti = get_logical_type_info(src_ti);
if (src_ti.usesFlatBuffer() || ti.is_geometry() || ti.is_array()) {
// Using FlatBuffer layout is forced for geometry types as this
// the only way to columnarize geo data:
ti.setUsesFlatBuffer(true);
} else if (ti.supportsFlatBuffer()) {
// Otherwise, in the case of array and text encoding none types,
// using FlatBuffer layout will be enabled on demand only:
ti.setUsesFlatBuffer(convert_to_flatbuffer);
}
col_types.push_back(ti);
}
return new ColumnarResults(
row_set_mem_owner, *result, result->colCount(), col_types, executor_id, thread_idx);
Expand Down Expand Up @@ -73,7 +86,8 @@ std::pair<const int8_t*, size_t> ColumnFetcher::getOneColumnFragment(
DeviceAllocator* device_allocator,
const size_t thread_idx,
std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
ColumnCacheMap& column_cache) {
ColumnCacheMap& column_cache,
bool convert_to_flatbuffer) {
static std::mutex columnar_conversion_mutex;
auto timer = DEBUG_TIMER(__func__);
if (fragment.isEmptyPhysicalFragment()) {
Expand All @@ -84,6 +98,10 @@ std::pair<const int8_t*, size_t> ColumnFetcher::getOneColumnFragment(
CHECK(!cd || !(cd->isVirtualCol));
const int8_t* col_buff = nullptr;
if (cd) { // real table
if (convert_to_flatbuffer) {
throw std::runtime_error(
"Conversion to flatbuffer for real tables not supported yet");
}
/* chunk_meta_it is used here to retrieve chunk numBytes and
numElements. Apparently, their values are often zeros. If we
knew how to predict the zero values, calling
Expand Down Expand Up @@ -128,7 +146,8 @@ std::pair<const int8_t*, size_t> ColumnFetcher::getOneColumnFragment(
get_temporary_table(executor->temporary_tables_, table_key.table_id),
executor->executor_id_,
thread_idx,
frag_id))));
frag_id,
convert_to_flatbuffer))));
}
col_frag = column_cache[table_key][frag_id].get();
}
Expand Down Expand Up @@ -995,8 +1014,8 @@ const int8_t* ColumnFetcher::transferColumnIfNeeded(
if (memory_level == Data_Namespace::GPU_LEVEL) {
const auto& col_ti = columnar_results->getColumnType(col_id);
size_t num_bytes;
if (col_ti.supports_flatbuffer() &&
FlatBufferManager::isFlatBuffer(col_buffers[col_id])) {
if (col_ti.usesFlatBuffer()) {
CHECK(FlatBufferManager::isFlatBuffer(col_buffers[col_id]));
num_bytes = FlatBufferManager::getBufferSize(col_buffers[col_id]);
} else {
num_bytes = columnar_results->size() * col_ti.get_size();
Expand Down
3 changes: 2 additions & 1 deletion QueryEngine/ColumnFetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class ColumnFetcher {
DeviceAllocator* device_allocator,
const size_t thread_idx,
std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
ColumnCacheMap& column_cache);
ColumnCacheMap& column_cache,
bool convert_to_flatbuffer = false);

//! Creates a JoinColumn struct containing an array of JoinChunk structs.
static JoinColumn makeJoinColumn(
Expand Down
17 changes: 9 additions & 8 deletions QueryEngine/ColumnIR.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ std::shared_ptr<Decoder> get_col_decoder(const Analyzer::ColumnVar* col_var) {
case kDATE:
return std::make_shared<FixedWidthInt>(8);
default:
CHECK(false);
CHECK(false) << "ti=" << ti;
}
}
case kENCODING_DICT:
Expand Down Expand Up @@ -119,6 +119,10 @@ std::vector<llvm::Value*> CodeGenerator::codegenColVar(const Analyzer::ColumnVar
return {codegenRowId(col_var, co)};
}
const auto col_ti = cd->columnType;
if (col_ti.usesFlatBuffer()) {
throw std::runtime_error(
"Flatbuffer storage in a real table column not supported yet");
}
if (col_ti.get_physical_coord_cols() > 0) {
std::vector<llvm::Value*> cols;
const auto col_id = column_key.column_id;
Expand All @@ -144,12 +148,6 @@ std::vector<llvm::Value*> CodeGenerator::codegenColVar(const Analyzer::ColumnVar
}
return cols;
}
} else {
const auto& col_ti = col_var->get_type_info();
if (col_ti.is_geometry() && !col_ti.supports_flatbuffer()) {
throw std::runtime_error(
"Geospatial columns not supported in temporary tables yet");
}
}
const auto grouped_col_lv = resolveGroupedColumnReference(col_var);
if (grouped_col_lv) {
Expand Down Expand Up @@ -212,7 +210,10 @@ std::vector<llvm::Value*> CodeGenerator::codegenColVar(const Analyzer::ColumnVar
}
return varlen_str_column_lvs;
}
if (col_ti.supports_flatbuffer()) {
if (col_ti.usesFlatBuffer()) {
return {col_byte_stream};
}
if (col_ti.is_array() || col_ti.get_type() == kPOINT) {
return {col_byte_stream};
}
if (window_func_context) {
Expand Down
Loading

0 comments on commit 50f7afa

Please sign in to comment.