Skip to content

Commit

Permalink
Merge pull request #121 from carlopi/bumpv111
Browse files Browse the repository at this point in the history
Bump to DuckDB v1.1.1
  • Loading branch information
carlopi authored Sep 24, 2024
2 parents b3bf03b + 4d2b736 commit 874c0dd
Show file tree
Hide file tree
Showing 146 changed files with 2,272 additions and 1,800 deletions.
3 changes: 2 additions & 1 deletion binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,8 @@
"DUCKDB_EXTENSION_ICU_LINKED",
"DUCKDB_EXTENSION_JSON_LINKED",
"DUCKDB_EXTENSION_AUTOLOAD_DEFAULT=1",
"DUCKDB_EXTENSION_AUTOINSTALL_DEFAULT=1"
"DUCKDB_EXTENSION_AUTOINSTALL_DEFAULT=1",
"NDEBUG"
],
"cflags_cc": [
"-frtti",
Expand Down

Large diffs are not rendered by default.

18 changes: 14 additions & 4 deletions src/duckdb/extension/json/include/json_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,16 @@ struct JSONCommon {
};

//! Get JSON value using JSON path query (safe, checks the path query)
static inline yyjson_val *Get(yyjson_val *val, const string_t &path_str) {
static inline yyjson_val *Get(yyjson_val *val, const string_t &path_str, bool integral_argument) {
auto ptr = path_str.GetData();
auto len = path_str.GetSize();
if (len == 0) {
return GetUnsafe(val, ptr, len);
}
if (integral_argument) {
auto str = "$[" + path_str.GetString() + "]";
return GetUnsafe(val, str.c_str(), str.length());
}
switch (*ptr) {
case '/': {
// '/' notation must be '\0'-terminated
Expand All @@ -260,9 +264,15 @@ struct JSONCommon {
}
return GetUnsafe(val, ptr, len);
}
default:
auto str = "/" + string(ptr, len);
return GetUnsafe(val, str.c_str(), len + 1);
default: {
string path;
if (memchr(ptr, '"', len)) {
path = "/" + string(ptr, len);
} else {
path = "$.\"" + path_str.GetString() + "\"";
}
return GetUnsafe(val, path.c_str(), path.length());
}
}
}

Expand Down
14 changes: 11 additions & 3 deletions src/duckdb/extension/json/include/json_executors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#pragma once

#include "duckdb/common/vector_operations/vector_operations.hpp"
#include "duckdb/execution/expression_executor.hpp"
#include "json_functions.hpp"

Expand Down Expand Up @@ -88,11 +89,18 @@ struct JSONExecutors {
}
} else { // Columnref path
D_ASSERT(info.path_type == JSONCommon::JSONPathType::REGULAR);
auto &paths = args.data[1];
unique_ptr<Vector> casted_paths;
if (args.data[1].GetType().id() == LogicalTypeId::VARCHAR) {
casted_paths = make_uniq<Vector>(args.data[1]);
} else {
casted_paths = make_uniq<Vector>(LogicalTypeId::VARCHAR);
VectorOperations::DefaultCast(args.data[1], *casted_paths, args.size(), true);
}
BinaryExecutor::ExecuteWithNulls<string_t, string_t, T>(
inputs, paths, result, args.size(), [&](string_t input, string_t path, ValidityMask &mask, idx_t idx) {
inputs, *casted_paths, result, args.size(),
[&](string_t input, string_t path, ValidityMask &mask, idx_t idx) {
auto doc = JSONCommon::ReadDocument(input, JSONCommon::READ_FLAG, lstate.json_allocator.GetYYAlc());
auto val = JSONCommon::Get(doc->root, path);
auto val = JSONCommon::Get(doc->root, path, args.data[1].GetType().IsIntegral());
if (SET_NULL_IF_NOT_FOUND && !val) {
mask.SetInvalid(idx);
return T {};
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/extension/json/json_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static DefaultMacro json_macros[] = {
"json_group_structure",
{"x", nullptr},
{{nullptr, nullptr}},
"json_structure(json_group_array(x))->'0'"},
"json_structure(json_group_array(x))->0"},
{DEFAULT_SCHEMA, "json", {"x", nullptr}, {{nullptr, nullptr}}, "json_extract(x, '$')"},
{nullptr, nullptr, {nullptr}, {{nullptr, nullptr}}, nullptr}};

Expand Down
23 changes: 16 additions & 7 deletions src/duckdb/extension/json/json_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,25 @@ static JSONPathType CheckPath(const Value &path_val, string &path, size_t &len)
const auto path_str_val = path_val.DefaultCastAs(LogicalType::VARCHAR);
auto path_str = path_str_val.GetValueUnsafe<string_t>();
len = path_str.GetSize();
auto ptr = path_str.GetData();
const auto ptr = path_str.GetData();
// Empty strings and invalid $ paths yield an error
if (len == 0) {
throw BinderException("Empty JSON path");
}
JSONPathType path_type = JSONPathType::REGULAR;
if (*ptr == '$') {
path_type = JSONCommon::ValidatePath(ptr, len, true);
}
// Copy over string to the bind data
if (*ptr == '/' || *ptr == '$') {
path = string(ptr, len);
} else {
} else if (path_val.type().IsIntegral()) {
path = "$[" + string(ptr, len) + "]";
} else if (memchr(ptr, '"', len)) {
path = "/" + string(ptr, len);
len++;
} else {
path = "$.\"" + string(ptr, len) + "\"";
}
len = path.length();
if (*path.c_str() == '$') {
path_type = JSONCommon::ValidatePath(path.c_str(), len, true);
}
return path_type;
}
Expand Down Expand Up @@ -67,7 +71,11 @@ unique_ptr<FunctionData> JSONReadFunctionData::Bind(ClientContext &context, Scal
path_type = CheckPath(path_val, path, len);
}
}
bound_function.arguments[1] = LogicalType::VARCHAR;
if (arguments[1]->return_type.IsIntegral()) {
bound_function.arguments[1] = LogicalType::BIGINT;
} else {
bound_function.arguments[1] = LogicalType::VARCHAR;
}
if (path_type == JSONCommon::JSONPathType::WILDCARD) {
bound_function.return_type = LogicalType::LIST(bound_function.return_type);
}
Expand Down Expand Up @@ -117,6 +125,7 @@ unique_ptr<FunctionData> JSONReadManyFunctionData::Bind(ClientContext &context,

JSONFunctionLocalState::JSONFunctionLocalState(Allocator &allocator) : json_allocator(allocator) {
}

JSONFunctionLocalState::JSONFunctionLocalState(ClientContext &context)
: JSONFunctionLocalState(BufferAllocator::Get(context)) {
}
Expand Down
14 changes: 11 additions & 3 deletions src/duckdb/extension/json/json_functions/json_extract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@ static inline string_t ExtractFromVal(yyjson_val *val, yyjson_alc *alc, Vector &
return JSONCommon::WriteVal<yyjson_val>(val, alc);
}

static inline string_t ExtractStringFromVal(yyjson_val *val, yyjson_alc *alc, Vector &, ValidityMask &, idx_t) {
return yyjson_is_str(val) ? string_t(unsafe_yyjson_get_str(val), unsafe_yyjson_get_len(val))
: JSONCommon::WriteVal<yyjson_val>(val, alc);
static inline string_t ExtractStringFromVal(yyjson_val *val, yyjson_alc *alc, Vector &, ValidityMask &mask, idx_t idx) {
switch (yyjson_get_tag(val)) {
case YYJSON_TYPE_NULL | YYJSON_SUBTYPE_NONE:
mask.SetInvalid(idx);
return string_t {};
case YYJSON_TYPE_STR | YYJSON_SUBTYPE_NOESC:
case YYJSON_TYPE_STR | YYJSON_SUBTYPE_NONE:
return string_t(unsafe_yyjson_get_str(val), unsafe_yyjson_get_len(val));
default:
return JSONCommon::WriteVal<yyjson_val>(val, alc);
}
}

static void ExtractFunction(DataChunk &args, ExpressionState &state, Vector &result) {
Expand Down
7 changes: 4 additions & 3 deletions src/duckdb/extension/json/json_functions/json_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace duckdb {

static inline string_t ValueFromVal(yyjson_val *val, yyjson_alc *alc, Vector &, ValidityMask &mask, idx_t idx) {
switch (yyjson_get_tag(val)) {
case YYJSON_TYPE_NULL | YYJSON_SUBTYPE_NONE:
case YYJSON_TYPE_ARR | YYJSON_SUBTYPE_NONE:
case YYJSON_TYPE_OBJ | YYJSON_SUBTYPE_NONE:
mask.SetInvalid(idx);
Expand All @@ -22,12 +23,12 @@ static void ValueManyFunction(DataChunk &args, ExpressionState &state, Vector &r
}

static void GetValueFunctionsInternal(ScalarFunctionSet &set, const LogicalType &input_type) {
set.AddFunction(ScalarFunction({input_type, LogicalType::BIGINT}, LogicalType::JSON(), ValueFunction,
set.AddFunction(ScalarFunction({input_type, LogicalType::BIGINT}, LogicalType::VARCHAR, ValueFunction,
JSONReadFunctionData::Bind, nullptr, nullptr, JSONFunctionLocalState::Init));
set.AddFunction(ScalarFunction({input_type, LogicalType::VARCHAR}, LogicalType::JSON(), ValueFunction,
set.AddFunction(ScalarFunction({input_type, LogicalType::VARCHAR}, LogicalType::VARCHAR, ValueFunction,
JSONReadFunctionData::Bind, nullptr, nullptr, JSONFunctionLocalState::Init));
set.AddFunction(ScalarFunction({input_type, LogicalType::LIST(LogicalType::VARCHAR)},
LogicalType::LIST(LogicalType::JSON()), ValueManyFunction,
LogicalType::LIST(LogicalType::VARCHAR), ValueManyFunction,
JSONReadManyFunctionData::Bind, nullptr, nullptr, JSONFunctionLocalState::Init));
}

Expand Down
3 changes: 3 additions & 0 deletions src/duckdb/extension/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ void ColumnReader::PrepareRead(parquet_filter_t &filter) {
break;
case PageType::DICTIONARY_PAGE:
PreparePage(page_hdr);
if (page_hdr.dictionary_page_header.num_values < 0) {
throw std::runtime_error("Invalid dictionary page header (num_values < 0)");
}
Dictionary(std::move(block), page_hdr.dictionary_page_header.num_values);
break;
default:
Expand Down
97 changes: 54 additions & 43 deletions src/duckdb/extension/parquet/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1209,47 +1209,6 @@ class IntervalColumnWriter : public BasicColumnWriter {
}
};

//===--------------------------------------------------------------------===//
// Geometry Column Writer
//===--------------------------------------------------------------------===//
// This class just wraps another column writer, but also calculates the extent
// of the geometry column by updating the geodata object with every written
// vector.
template <class WRITER_IMPL>
class GeometryColumnWriter : public WRITER_IMPL {
GeoParquetColumnMetadata geo_data;
GeoParquetColumnMetadataWriter geo_data_writer;
string column_name;

public:
void Write(ColumnWriterState &state, Vector &vector, idx_t count) override {
// Just write normally
WRITER_IMPL::Write(state, vector, count);

// And update the geodata object
geo_data_writer.Update(geo_data, vector, count);
}
void FinalizeWrite(ColumnWriterState &state) override {
WRITER_IMPL::FinalizeWrite(state);

// Add the geodata object to the writer
this->writer.GetGeoParquetData().geometry_columns[column_name] = geo_data;
}

public:
GeometryColumnWriter(ClientContext &context, ParquetWriter &writer, idx_t schema_idx, vector<string> schema_path_p,
idx_t max_repeat, idx_t max_define, bool can_have_nulls, string name)
: WRITER_IMPL(writer, schema_idx, std::move(schema_path_p), max_repeat, max_define, can_have_nulls),
geo_data_writer(context), column_name(std::move(name)) {

auto &geo_data = writer.GetGeoParquetData();
if (geo_data.primary_geometry_column.empty()) {
// Set the first column to the primary column
geo_data.primary_geometry_column = column_name;
}
}
};

//===--------------------------------------------------------------------===//
// String Column Writer
//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -1563,6 +1522,58 @@ class StringColumnWriter : public BasicColumnWriter {
}
};

//===--------------------------------------------------------------------===//
// WKB Column Writer
//===--------------------------------------------------------------------===//
// Used to store the metadata for a WKB-encoded geometry column when writing
// GeoParquet files.
class WKBColumnWriterState final : public StringColumnWriterState {
public:
WKBColumnWriterState(ClientContext &context, duckdb_parquet::format::RowGroup &row_group, idx_t col_idx)
: StringColumnWriterState(row_group, col_idx), geo_data(), geo_data_writer(context) {
}

GeoParquetColumnMetadata geo_data;
GeoParquetColumnMetadataWriter geo_data_writer;
};

class WKBColumnWriter final : public StringColumnWriter {
public:
WKBColumnWriter(ClientContext &context_p, ParquetWriter &writer, idx_t schema_idx, vector<string> schema_path_p,
idx_t max_repeat, idx_t max_define, bool can_have_nulls, string name)
: StringColumnWriter(writer, schema_idx, std::move(schema_path_p), max_repeat, max_define, can_have_nulls),
column_name(std::move(name)), context(context_p) {

this->writer.GetGeoParquetData().RegisterGeometryColumn(column_name);
}

unique_ptr<ColumnWriterState> InitializeWriteState(duckdb_parquet::format::RowGroup &row_group) override {
auto result = make_uniq<WKBColumnWriterState>(context, row_group, row_group.columns.size());
RegisterToRowGroup(row_group);
return std::move(result);
}
void Write(ColumnWriterState &state, Vector &vector, idx_t count) override {
StringColumnWriter::Write(state, vector, count);

auto &geo_state = state.Cast<WKBColumnWriterState>();
geo_state.geo_data_writer.Update(geo_state.geo_data, vector, count);
}

void FinalizeWrite(ColumnWriterState &state) override {
StringColumnWriter::FinalizeWrite(state);

// Add the geodata object to the writer
const auto &geo_state = state.Cast<WKBColumnWriterState>();

// Merge this state's geo column data with the writer's geo column data
writer.GetGeoParquetData().FlushColumnMeta(column_name, geo_state.geo_data);
}

private:
string column_name;
ClientContext &context;
};

//===--------------------------------------------------------------------===//
// Enum Column Writer
//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -2234,8 +2245,8 @@ unique_ptr<ColumnWriter> ColumnWriter::CreateWriterRecursive(ClientContext &cont
schema_path.push_back(name);

if (type.id() == LogicalTypeId::BLOB && type.GetAlias() == "WKB_BLOB") {
return make_uniq<GeometryColumnWriter<StringColumnWriter>>(context, writer, schema_idx, std::move(schema_path),
max_repeat, max_define, can_have_nulls, name);
return make_uniq<WKBColumnWriter>(context, writer, schema_idx, std::move(schema_path), max_repeat, max_define,
can_have_nulls, name);
}

switch (type.id()) {
Expand Down
19 changes: 19 additions & 0 deletions src/duckdb/extension/parquet/geo_parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,17 @@ GeoParquetFileMetadata::TryRead(const duckdb_parquet::format::FileMetaData &file
return nullptr;
}

void GeoParquetFileMetadata::FlushColumnMeta(const string &column_name, const GeoParquetColumnMetadata &meta) {
// Lock the metadata
lock_guard<mutex> glock(write_lock);

auto &column = geometry_columns[column_name];

// Combine the metadata
column.geometry_types.insert(meta.geometry_types.begin(), meta.geometry_types.end());
column.bbox.Combine(meta.bbox);
}

void GeoParquetFileMetadata::Write(duckdb_parquet::format::FileMetaData &file_meta_data) const {

yyjson_mut_doc *doc = yyjson_mut_doc_new(nullptr);
Expand Down Expand Up @@ -349,6 +360,14 @@ bool GeoParquetFileMetadata::IsGeometryColumn(const string &column_name) const {
return geometry_columns.find(column_name) != geometry_columns.end();
}

void GeoParquetFileMetadata::RegisterGeometryColumn(const string &column_name) {
lock_guard<mutex> glock(write_lock);
if (primary_geometry_column.empty()) {
primary_geometry_column = column_name;
}
geometry_columns[column_name] = GeoParquetColumnMetadata();
}

unique_ptr<ColumnReader> GeoParquetFileMetadata::CreateColumnReader(ParquetReader &reader,
const LogicalType &logical_type,
const SchemaElement &s_ele, idx_t schema_idx_p,
Expand Down
16 changes: 10 additions & 6 deletions src/duckdb/extension/parquet/include/geo_parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,25 +115,29 @@ class GeoParquetColumnMetadataWriter {
void Update(GeoParquetColumnMetadata &meta, Vector &vector, idx_t count);
};

struct GeoParquetFileMetadata {
class GeoParquetFileMetadata {
public:
// Try to read GeoParquet metadata. Returns nullptr if not found, invalid or the required spatial extension is not
// available.
static unique_ptr<GeoParquetFileMetadata> TryRead(const duckdb_parquet::format::FileMetaData &file_meta_data,
ClientContext &context);
void Write(duckdb_parquet::format::FileMetaData &file_meta_data) const;

public:
// Default to 1.1.0 for now
string version = "1.1.0";
string primary_geometry_column;
unordered_map<string, GeoParquetColumnMetadata> geometry_columns;
void FlushColumnMeta(const string &column_name, const GeoParquetColumnMetadata &meta);
const unordered_map<string, GeoParquetColumnMetadata> &GetColumnMeta() const;

unique_ptr<ColumnReader> CreateColumnReader(ParquetReader &reader, const LogicalType &logical_type,
const duckdb_parquet::format::SchemaElement &s_ele, idx_t schema_idx_p,
idx_t max_define_p, idx_t max_repeat_p, ClientContext &context);

bool IsGeometryColumn(const string &column_name) const;
void RegisterGeometryColumn(const string &column_name);

private:
mutex write_lock;
string version = "1.1.0";
string primary_geometry_column;
unordered_map<string, GeoParquetColumnMetadata> geometry_columns;
};

} // namespace duckdb
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ class TemplatedColumnReader : public ColumnReader {

void Offsets(uint32_t *offsets, uint8_t *defines, uint64_t num_values, parquet_filter_t &filter,
idx_t result_offset, Vector &result) override {
if (!dict) {
throw IOException(
"Parquet file is likely corrupted, cannot have dictionary offsets without seeing a dictionary first.");
if (!dict || dict->len == 0) {
throw IOException("Parquet file is likely corrupted, cannot have dictionary offsets without seeing a "
"non-empty dictionary first.");
}
if (HasDefines()) {
OffsetsInternal<true>(*dict, offsets, defines, num_values, filter, result_offset, result);
Expand Down
Loading

0 comments on commit 874c0dd

Please sign in to comment.