-
Notifications
You must be signed in to change notification settings - Fork 83
Description
Request
Whenever we need to support near real-time search we can not rely on being able to aggregate data into archives before having to search it. Instead we need to be able to search the data in whatever representation it happens to exist prior to aggregation into an archive, which should be kv-ir in the common case.
We will need this kv-ir search functionality fairly soon in order to support a complete Presto solution, but it will also be useful for other near real-time use-cases such as (potentially) in the package.
To support the immediate presto use-case the MVP requires:
- The ability to support KQL search semantics identical to searching a clp-s archive
- Projection support
Metadata based top-level filtering is also necessary for Presto support, but such support lies outside of the scope of the core search functionality discussed here.
Possible implementation
High level organization
Search support will be implemented by wrapping clp::ffi::ir_stream::Deserializer
. This wrapper class will accept a query in the form of std::shared_ptr<clp_s::search::Expression>
, a vector of strings representing columns to project (which will be treated according to the kql column parsing rules), and a callback (likely passed as a template parameter matching a concept as we do with IrUnitHandlerInterface
) that notifies the user of the search wrapper of nodes that match their requested projection.
This wrapper will provide an interface that acts somewhat like clp::ffi::ir_stream::Deserializer::deserialize_next_ir_unit
except it will only return the IrUnitType::LogEvent
that match the query.
For each returned IrUnitType::LogEvent
the user will be responsible for picking out the columns from their projection based on information passed to them via their callback function. In particular the callback will have a signature like (std::string_view column, uint32_t new_node_id, bool autogen) -> void
which notifies the user of new matching node ids in the auto-generated and user-generated schema trees. We prefer a solution like this over wrapping IrUnitType::LogEvent
with another class that only forwards the projected columns because it requires fewer allocations, less compute, and directly ties the matching node ids to each of the column names passed by the user.
It could make sense for this search wrapper implementation to live in either clp_s::search
or clp::ffi::ir_stream
and will likely require refactoring the clp_s::
build to turn at least the AST in clp_s::search
into its own library either way.
As well, it might be worthwhile to offer kv-ir search in one of our binaries (such as clp-s s
), but that can happen outside of an initial kv-ir search implementation.
High level search implementation
Search will operate by accepting a clp_s::search::Expression
, simplifying it using the OrOfAndForm
, NarrowTypes
, and ConvertToExists
transformation passes, and using the resulting expression to filter against records in the kv-ir stream.
The only tricky part of search (and projection) is writing an efficient dynamic column resolution implementation that can resolve columns as nodes are added to the Schema Tree throughout the stream in order to maintain the following mapping:
column -> set<matching node ids>
(the set is just to indicate that we map to a unique set of mapping nodes -- we can actually just use an std::vector for the underlying storage)
Generally the dynamic column resolution gets executed in a callback that is triggered whenever a node is added to the schema tree, and works something like the following:
def new_node_callback(parent_node_id, key_name, node_id, node_type):
if state[parent_node_id] is None:
return
for partial_resolution : state[parent_node_id]:
switch(try_resolve(partial_resolution, key_name, node_type)):
case MatchesFilterColumn:
update_resolutions(partial_resolution, node_id)
break
case MatchesProjectionColumn:
trigger_projection_callback(projection_resolution, node_id)
break
case PartialResolution:
add_to_state(node_id, generate_next_resolutions(partial_resolution))
break
default:
break
We have a few options for how we represent the state vector that tracks partial resolutions for some node id.
The simplest would be a vector of std::tuple<ColumnDescriptor*, (iterator indicating how many tokens matched so far)>
with some external metadata mapping ColumnDescriptor* -> what do do when resolved
. This solution is likely good enough, especially considering that schema trees tend to be small, but we do have other options if this becomes a performance concern.
In particular we can take all of the tokens in all of the columns we are trying to resolve (both from the query and projection) and merge them together into a trie of tokens. Each potential terminal state in the trie (corresponding to the last token in a column being resolved) would have some associated metadata indicating what to do when the node is matched. The state vector then would just be a vector of iterators into the trie -- since the trie can contain wildcard tokens multiple positions in the trie can be partially resolved on some intermediate node id. The advantage of this trie-based approach is that we get to save computation/string matching when resolving multiple columns with the same set of prefix tokens.
Either way this dynamic column resolution algorithm will allow us to maintain the necessary column -> set<matching node ids>
mapping.
To scan each record we have the option of trying to separate the process into a Schema Resolution and Filtering stage or simply perform both in one pass. Tentatively I prefer performing both in one pass because (1) it is simpler and (2) we have no real evidence that one approach will be faster than the other.
To perform both at the same time the we would walk the search AST and perform the following for each leaf node:
def filter(filter_expr, record):
matching_node_id = None
for node_id : matching_nodes(filter_expr.column):
if node_id in record:
matching_node_id = node_id
break
if matching_node_id is None:
return PruneAstSubtree
return execute_filter(record, node_id, filter_expr)
Where we treat PruneAstSubtree
as a special kind of false value as a special kind of false value that will eliminate AND subtrees and the relevant branch of OR subtrees regardless of expression inversion.
Array search
Array search will be somewhat tricky and needs some thought -- the main issues are:
- If some prefix of a column matches an array column we need to dynamically expand the search AST to try to resolve its suffix within the array
- Archive search doesn't handle this dynamic expansion properly when using unstructured arrays
- Archive search doesn't allow wildcards in columns when searching against unstructured arrays
In fact the SchemaMatch pass has the following comment indicating that the following case doesn't work for unstructured arrays:
/**
* TODO: This doesn't work in general, but it had the same limitation in the previous
* implementation, so I will leave it broken for now.
*
* E.g. breaks for a query like `a.b.c:d` on the collection of objects
* {"a": [{"b": {"c": "d"}}]}
* {"a": {"b": [{"c": "d"}]}}
*/
We should be able to handle the array dynamic expansion issue for kv-ir search without actually dynamically modifying the AST by maintain a mapping column* -> vector<{array node_id, metadata about suffix to match}>
where we treat the vector as an OR of possibilities when evaluating as FilterExpr at the bottom level of the AST.
It probably makes the most sense to either go with this implementation or postpone kv-ir array search support until the archive array search bug is fixed.