Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ void ArchiveReader::initialize_schema_reader(
auto& schema = (*m_schema_map)[schema_id];
reader.reset(
m_schema_tree,
m_projection,
schema_id,
schema.get_ordered_schema_view(),
m_id_to_table_metadata[schema_id].num_messages,
Expand Down
8 changes: 8 additions & 0 deletions components/core/src/clp_s/ArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "DictionaryReader.hpp"
#include "ReaderUtils.hpp"
#include "SchemaReader.hpp"
#include "search/Projection.hpp"
#include "TimestampDictionaryReader.hpp"
#include "Utils.hpp"

Expand Down Expand Up @@ -133,6 +134,10 @@ class ArchiveReader {
*/
[[nodiscard]] std::vector<int32_t> const& get_schema_ids() const { return m_schema_ids; }

void set_projection(std::shared_ptr<search::Projection> projection) {
m_projection = projection;
}

private:
/**
* Initializes a schema reader passed by reference to become a reader for a given schema.
Expand Down Expand Up @@ -182,6 +187,9 @@ class ArchiveReader {
std::shared_ptr<ReaderUtils::SchemaMap> m_schema_map;
std::vector<int32_t> m_schema_ids;
std::map<int32_t, SchemaReader::TableMetadata> m_id_to_table_metadata;
std::shared_ptr<search::Projection> m_projection{
std::make_shared<search::Projection>(search::ProjectionMode::ReturnAllColumns)
};

FileReader m_tables_file_reader;
FileReader m_table_metadata_file_reader;
Expand Down
2 changes: 2 additions & 0 deletions components/core/src/clp_s/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ set(
search/Output.hpp
search/OutputHandler.cpp
search/OutputHandler.hpp
search/Projection.cpp
search/Projection.hpp
search/SchemaMatch.cpp
search/SchemaMatch.hpp
search/SearchUtils.cpp
Expand Down
8 changes: 8 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,14 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
"archive-id",
po::value<std::string>(&m_archive_id)->value_name("ID"),
"Limit search to the archive with the given ID"
)(
"projection",
po::value<std::vector<std::string>>(&m_projection_columns)
->multitoken()
->value_name("COLUMN_A COLUMN_B ..."),
"Project only the given set of columns for matching results. This option must be"
" specified after all positional options. Values that are objects or structured"
" arrays are currently unsupported."
);
// clang-format on
search_options.add(match_options);
Expand Down
3 changes: 3 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ class CommandLineArguments {

size_t get_ordered_chunk_size() const { return m_ordered_chunk_size; }

std::vector<std::string> const& get_projection_columns() const { return m_projection_columns; }

private:
// Methods
/**
Expand Down Expand Up @@ -192,6 +194,7 @@ class CommandLineArguments {
std::optional<epochtime_t> m_search_begin_ts;
std::optional<epochtime_t> m_search_end_ts;
bool m_ignore_case{false};
std::vector<std::string> m_projection_columns;

// Decompression and search variables
std::string m_archive_id;
Expand Down
8 changes: 7 additions & 1 deletion components/core/src/clp_s/JsonSerializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,13 @@ class JsonSerializer {

void begin_document() { m_json_string += "{"; }

void end_document() { m_json_string[m_json_string.size() - 1] = '}'; }
void end_document() {
if ('{' != m_json_string.back()) {
m_json_string[m_json_string.size() - 1] = '}';
} else {
m_json_string += '}';
}
}

void end_object() {
if (m_op_list[m_op_list_index - 2] != BeginObject
Expand Down
12 changes: 9 additions & 3 deletions components/core/src/clp_s/SchemaReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,19 +548,25 @@ void SchemaReader::initialize_serializer() {
m_serializer_initialized = true;

for (int32_t global_column_id : m_ordered_schema) {
generate_local_tree(global_column_id);
if (m_projection->matches_node(global_column_id)) {
generate_local_tree(global_column_id);
}
}

for (auto it = m_global_id_to_unordered_object.begin();
it != m_global_id_to_unordered_object.end();
++it)
{
generate_local_tree(it->first);
if (m_projection->matches_node(it->first)) {
generate_local_tree(it->first);
}
}

// TODO: this code will have to change once we allow mixing log lines parsed by different
// parsers.
generate_json_template(0);
if (false == m_local_schema_tree.get_nodes().empty()) {
generate_json_template(m_local_schema_tree.get_root_node_id());
}
}

void SchemaReader::generate_json_template(int32_t id) {
Expand Down
5 changes: 5 additions & 0 deletions components/core/src/clp_s/SchemaReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "FileReader.hpp"
#include "JsonSerializer.hpp"
#include "SchemaTree.hpp"
#include "search/Projection.hpp"
#include "ZstdDecompressor.hpp"

namespace clp_s {
Expand Down Expand Up @@ -71,13 +72,15 @@ class SchemaReader {
* to accept append_column calls for the new schema.
*
* @param schema_tree
* @param projection
* @param schema_id
* @param ordered_schema
* @param num_messages
* @param should_marshal_records
*/
void reset(
std::shared_ptr<SchemaTree> schema_tree,
std::shared_ptr<search::Projection> projection,
int32_t schema_id,
std::span<int32_t> ordered_schema,
uint64_t num_messages,
Expand All @@ -100,6 +103,7 @@ class SchemaReader {
m_local_schema_tree.clear();
m_json_serializer.clear();
m_global_schema_tree = std::move(schema_tree);
m_projection = std::move(projection);
m_should_marshal_records = should_marshal_records;
}

Expand Down Expand Up @@ -291,6 +295,7 @@ class SchemaReader {
JsonSerializer m_json_serializer;
bool m_should_marshal_records{true};
bool m_serializer_initialized{false};
std::shared_ptr<search::Projection> m_projection;

std::map<int32_t, std::pair<size_t, std::span<int32_t>>> m_global_id_to_unordered_object;
};
Expand Down
21 changes: 21 additions & 0 deletions components/core/src/clp_s/clp-s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "search/OrOfAndForm.hpp"
#include "search/Output.hpp"
#include "search/OutputHandler.hpp"
#include "search/Projection.hpp"
#include "search/SchemaMatch.hpp"
#include "TimestampPattern.hpp"
#include "TraceableException.hpp"
Expand All @@ -39,6 +40,7 @@ using clp_s::cArchiveFormatDevelopmentVersionFlag;
using clp_s::cEpochTimeMax;
using clp_s::cEpochTimeMin;
using clp_s::CommandLineArguments;
using clp_s::StringUtils;

namespace {
/**
Expand Down Expand Up @@ -179,6 +181,25 @@ bool search_archive(
return true;
}

// Populate projection
auto projection = std::make_shared<Projection>(
command_line_arguments.get_projection_columns().empty()
? ProjectionMode::ReturnAllColumns
: ProjectionMode::ReturnSelectedColumns
);
try {
for (auto const& column : command_line_arguments.get_projection_columns()) {
std::vector<std::string> descriptor_tokens;
StringUtils::tokenize_column_descriptor(column, descriptor_tokens);
projection->add_column(ColumnDescriptor::create(descriptor_tokens));
}
} catch (clp_s::TraceableException& e) {
SPDLOG_ERROR("{}", e.what());
return false;
}
projection->resolve_columns(archive_reader->get_schema_tree());
archive_reader->set_projection(projection);

std::unique_ptr<OutputHandler> output_handler;
try {
switch (command_line_arguments.get_output_handler_type()) {
Expand Down
88 changes: 88 additions & 0 deletions components/core/src/clp_s/search/Projection.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#include "Projection.hpp"

#include <algorithm>

#include "SearchUtils.hpp"

namespace clp_s::search {
void Projection::add_column(std::shared_ptr<ColumnDescriptor> column) {
if (column->is_unresolved_descriptor()) {
throw OperationFailed(ErrorCodeBadParam, __FILE__, __LINE__);
}
if (ProjectionMode::ReturnAllColumns == m_projection_mode) {
throw OperationFailed(ErrorCodeUnsupported, __FILE__, __LINE__);
}
if (m_selected_columns.end()
!= std::find_if(
m_selected_columns.begin(),
m_selected_columns.end(),
[column](auto const& rhs) -> bool { return *column == *rhs; }
))
{
// no duplicate columns in projection
throw OperationFailed(ErrorCodeBadParam, __FILE__, __LINE__);
}
m_selected_columns.push_back(column);
}

void Projection::resolve_columns(std::shared_ptr<SchemaTree> tree) {
for (auto& column : m_selected_columns) {
resolve_column(tree, column);
}
}

void Projection::resolve_column(
std::shared_ptr<SchemaTree> tree,
std::shared_ptr<ColumnDescriptor> column
) {
/**
* Ideally we would reuse the code from SchemaMatch for resolving columns, but unfortunately we
* can not.
*
* The main reason is that here we don't want to allow projection to travel inside unstructured
* objects -- it may be possible to support such a thing in the future, but it poses some extra
* challenges (e.g. deciding what to do when projecting repeated elements in a structure).
*
* It would be possible to create code that can handle our use-case and SchemaMatch's use-case
* in an elegant way, but it's a significant refactor. In particular, if we extend our column
* type system to be one-per-token instead of one-per-column we can make it so that intermediate
* tokens will not match certain kinds of MPT nodes (like the node for structured arrays).
*
* In light of that we implement a simple version of column resolution here that does exactly
* what we need.
*/

auto cur_node_id = tree->get_root_node_id();
auto it = column->descriptor_begin();
while (it != column->descriptor_end()) {
bool matched_any{false};
auto cur_it = it++;
bool last_token = it == column->descriptor_end();
auto const& cur_node = tree->get_node(cur_node_id);
for (int32_t child_node_id : cur_node.get_children_ids()) {
auto const& child_node = tree->get_node(child_node_id);

// Intermediate nodes must be objects
if (false == last_token && child_node.get_type() != NodeType::Object) {
continue;
}

if (child_node.get_key_name() != cur_it->get_token()) {
continue;
}

matched_any = true;
if (last_token && column->matches_type(node_to_literal_type(child_node.get_type()))) {
m_matching_nodes.insert(child_node_id);
} else if (false == last_token) {
cur_node_id = child_node_id;
break;
}
}

if (false == matched_any) {
break;
}
}
}
} // namespace clp_s::search
82 changes: 82 additions & 0 deletions components/core/src/clp_s/search/Projection.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#ifndef CLP_S_SEARCH_PROJECTION_HPP
#define CLP_S_SEARCH_PROJECTION_HPP

#include <vector>

#include <absl/container/flat_hash_set.h>

#include "../SchemaTree.hpp"
#include "../TraceableException.hpp"
#include "ColumnDescriptor.hpp"

namespace clp_s::search {
enum ProjectionMode : uint8_t {
ReturnAllColumns,
ReturnSelectedColumns
};

/**
* This class describes the set of columns that should be included in the projected results.
*
* After adding columns and before calling matches_node the caller is responsible for calling
* resolve_columns.
*/
class Projection {
public:
class OperationFailed : public TraceableException {
public:
// Constructors
OperationFailed(ErrorCode error_code, char const* const filename, int line_number)
: TraceableException(error_code, filename, line_number) {}
};
Comment on lines +26 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider renaming OperationFailed to ProjectionOperationFailed for clarity.

The exception class OperationFailed has a generic name that may lead to confusion if similar exceptions exist elsewhere in the codebase. Renaming it to ProjectionOperationFailed will make its purpose more explicit and improve readability.

Apply this diff to rename the exception class:

-    class OperationFailed : public TraceableException {
+    class ProjectionOperationFailed : public TraceableException {

Remember to update all references to OperationFailed accordingly.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class OperationFailed : public TraceableException {
public:
// Constructors
OperationFailed(ErrorCode error_code, char const* const filename, int line_number)
: TraceableException(error_code, filename, line_number) {}
};
class ProjectionOperationFailed : public TraceableException {
public:
// Constructors
ProjectionOperationFailed(ErrorCode error_code, char const* const filename, int line_number)
: TraceableException(error_code, filename, line_number) {}
};


explicit Projection(ProjectionMode mode) : m_projection_mode{mode} {}

/**
* Adds a column to the set of columns that should be included in the projected results
* @param column
* @throws OperationFailed if `column` contains a wildcard
* @throws OperationFailed if this instance of Projection is in mode ReturnAllColumns
* @throws OperationFailed if `column` is identical to a previously added column
*/
void add_column(std::shared_ptr<ColumnDescriptor> column);

/**
* Resolves all columns for the purpose of projection. This key resolution implementation is
* more limited than the one in schema matching. In particular, this version of key resolution
* only allows resolving keys that do not contain wildcards and does not allow resolving to
* objects within arrays.
*
* Note: we could try to generalize column resolution code/move it to the schema tree. It is
* probably best to write a simpler version dedicated to projection for now since types are
* leaf-only. The type-per-token idea solves this problem (in the absence of wildcards).
*
* @param tree
*/
void resolve_columns(std::shared_ptr<SchemaTree> tree);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a description for this method?


/**
* Checks whether a column corresponding to given leaf node should be included in the output
* @param node_id
* @return true if the column should be included in the output, false otherwise
*/
bool matches_node(int32_t node_id) const {
return ProjectionMode::ReturnAllColumns == m_projection_mode
|| m_matching_nodes.contains(node_id);
}

private:
/**
* Resolves an individual column as described by the `resolve_columns` method.
* @param tree
* @param column
*/
void resolve_column(std::shared_ptr<SchemaTree> tree, std::shared_ptr<ColumnDescriptor> column);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also for this method.


std::vector<std::shared_ptr<ColumnDescriptor>> m_selected_columns;
absl::flat_hash_set<int32_t> m_matching_nodes;
ProjectionMode m_projection_mode{ProjectionMode::ReturnAllColumns};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove redundant in-class initialization of m_projection_mode.

Since m_projection_mode is always initialized by the constructor, the in-class initializer is redundant and can be removed to avoid potential confusion.

Apply this diff to remove the redundant initialization:

-    ProjectionMode m_projection_mode{ProjectionMode::ReturnAllColumns};
+    ProjectionMode m_projection_mode;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ProjectionMode m_projection_mode{ProjectionMode::ReturnAllColumns};
ProjectionMode m_projection_mode;

};
} // namespace clp_s::search

#endif // CLP_S_SEARCH_PROJECTION_HPP
Loading