Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Druid nested data columns #12695

Open
clintropolis opened this issue Jun 23, 2022 · 4 comments
Open

Druid nested data columns #12695

clintropolis opened this issue Jun 23, 2022 · 4 comments

Comments

@clintropolis
Copy link
Member

clintropolis commented Jun 23, 2022

Motivation

Apache Druid has quite a lot of tricks up its sleeve for providing extremely fast queries on very large datasets. However, one of the major limitations in the current system is that this only works on completely flattened data since that is all that Druid segments are currently able to natively store (and table to table join support is limited). To achieve this flattened table requires either external transformation or utilizing the built-in 'flattening' that Druid ingestion supports, in order to pluck specific nested values and translate them into top level columns within a segment.

This however has a downside in that the exact set of extractions to be performed must be completely known up front, prior to ingestion, which is especially hard if not impossible to deal with in the case of loosely structured data whose schema might vary row to row. Additionally, often-times this structure is in fact interesting, illustrating relations between values, which is lost completely when transformed into flattened Druid tables without careful naming.

In order to overcome this, this proposal focuses on building out the capabilities to store nested and structured data directly as it is, and query nested fields within this structure without sacrificing the performance available to queries operating on traditional Druid flattened columns.

Proposed changes

To achieve this, we will introduce a new type of column for storing structured data in Druid segments. The initial implementation centers on leaning heavily into what we already know Druid does very well, taking an approach I like to refer to as "a bunch of columns in a trench coat".

trenchcoat?

This column is built on top of Druids 'complex' type system, which allows complete control over how columns are encoded and decoded, and virtual columns to allow building specialized value selectors for the nested columns through VirtualColumn implementations. At ingestion time, all 'paths' in the structured data which contain a 'literal' field (Druid STRING, LONG, or DOUBLE) will be split out into internal 'nested field literal' columns, and stored in a manner similar to how we store normal literal columns, complete with dictionary encoding and bitmap value indexes.

To prove feasibility, I've actually been prototyping this functionality for a bit over 6 months now, making core improvements along the way as needed to improve the complex type system and indexes functionality, and testing with a variety of different workloads. This effort is a spiritual successor to the 'map-string-string' column of #10628, except instead of 1 layer deep with only strings, this proposal allows for any level of nesting and supporting the complete set of Druid literal types. The short list of important core changes that have made this feature possible:

Additionally, the investigation in #12277 is inspired by the changes proposed here (which should become apparent shortly).

Column format

Internally, the nested column is structured into a main column file in the smoosh, and several associated "internal" files for every nested literal field in the structure. All literal fields are dictionary encoded, but unlike our dictionary encoded STRING columns, will share a value dictionary that is 'global' to all of the nested columns. The global value dictionaries are split by type and stacked (strings are ids 0 through m, longs m + 1 through n, doubles n + 1 to the end). Locally, the nested columns will have a dictionary which maps local dictionary ids to these global dictionary ids (int -> int), so value lookup is a 2 step operation of local to global, then global to value.

The complex column is composed of:

  • compressed, 'raw' representation of the structured data
  • bitmap to indicate which rows are null values
  • a list of all 'literal' nested columns contained in the structure
  • type information for all 'literal' nested columns contained in the structure
  • global value dictionaries for all 'literal' values that are shared between all nested columns

The nested field literal contain:

  • local to global integer dictionary
  • local dictionary encoded compressed integer value column
  • bitmap value indexes
  • for numeric columns, compressed numeric value columns

PNG image-0DD3C48BAEAC-1

Querying

Querying will be done primarily through specialized VirtualColumn, and which will create optimized selectors to read the nested fields. These will look a lot like the standard Druid column selectors for other types, though with some subtle differences.

These VirtualColumn implementations will also be wired up to SQL functions to allow nested data to be queried with ease. The initial set of functions will be a standard-ish set of JSON based functions:

SQL functions
Function Notes
JSON_KEYS(expr, path) Returns an array of field names from expr at the specified path.
JSON_OBJECT(KEY expr1 VALUE expr2[, KEY expr3 VALUE expr4, ...]) Constructs a new COMPLEX<json> object. The KEY expressions must evaluate to string types. The VALUE expressions can be composed of any input type, including other COMPLEX<json> values. JSON_OBJECT can accept colon-separated key-value pairs. The following syntax is equivalent: JSON_OBJECT(expr1:expr2[, expr3:expr4, ...]).
JSON_PATHS(expr) Returns an array of all paths which refer to literal values in expr in JSONPath format.
JSON_QUERY(expr, path) Extracts a COMPLEX<json> value from expr, at the specified path.
JSON_VALUE(expr, path [RETURNING sqlType]) Extracts a literal value from expr at the specified path. If you specify RETURNING and an SQL type name (such as VARCHAR, BIGINT, DOUBLE, etc) the function plans the query using the suggested type. Otherwise, it attempts to infer the type based on the context. If it can't infer the type, it defaults to VARCHAR.
PARSE_JSON(expr) Parses expr into a COMPLEX<json> object. This operator deserializes JSON values when processing them, translating stringified JSON into a nested structure. If the input is not a VARCHAR or it is invalid JSON, this function will result in an error.
TRY_PARSE_JSON(expr) Parses expr into a COMPLEX<json> object. This operator deserializes JSON values when processing them, translating stringified JSON into a nested structure. If the input is not a VARCHAR or it is invalid JSON, this function will result in a NULL value.
TO_JSON_STRING(expr) Serializes expr into a JSON string.
JSONPath syntax

Initially we will support only a small simplified subset of the JSONPath syntax operators, primarily limited to extracting individual values from nested data structures.

operator description
$ 'Root' element, all JSONPath expressions will start with this operator
.<name> 'Child' element in 'dot' notation
['<name>'] 'Child' element in 'bracket' notation
[<number>] 'Array' index

though in the future we will likely expand on this.

Ingestion

During ingestion, a new nested column indexer will process nested data from input rows, traversing the structure and building a global dictionary of all literal values encountered. At persist time, this dictionary is sorted, and then the 'raw' data is serialized with SMILE encoding into a compressed column. As we serialize the rows, we traverse the nested structure again, this time with sorted dictionary in hand and write out columns for the nested literal field columns into temporary files, building local value dictionaries in the process. Once the 'raw' column is complete, we iterate over the nested literal columns, sort their local dictionaries, and write out their finished column, with compressed dictionary encoded value columns, for numeric types compressed numeric columns, and the local dictionaries and bitmap value indexes.

The nested data column indexer will be specified via a new DimensionSchema type, initially using json as the type as the initial implementation will only support JSON format, which will process the rows that are pointed at it (even literals).

{
  "type": "json",
  "name": "someNestedColumnName"
}

That's basically it. For convenience when working with text input formats, like TSV, if all processed rows are string literals the indexer will try to deserialize as JSON, if the data looks like JSON.

Additionally, we will add a handful of native Druid expressions (which will also handle composition uses at query time), which will be able to perform many of the operations which are currently done via flattenSpec, but instead through transformSpec.

"transformSpec": {
  "transforms": [
    { "type": "expression", "name": "transformedJson", "expression": "json_value(someNestedColumnName, '$.x.y')" }
  ]
}
Native expressions
function description
json_value(expr, path) Extract a Druid literal (STRING, LONG, DOUBLE) value from expr using JSONPath syntax of path
json_query(expr, path) Extract a COMPLEX<json> value from expr using JSONPath syntax of path
json_object(expr1, expr2[, expr3, expr4 ...]) Construct a COMPLEX<json> with alternating 'key' and 'value' arguments
parse_json(expr) Deserialize a JSON STRING into a COMPLEX<json>. If the input is not a STRING or it is invalid JSON, this function will result in an error.
try_parse_json(expr) Deserialize a JSON STRING into a COMPLEX<json>. If the input is not a STRING or it is invalid JSON, this function will result in a NULL value.
to_json_string(expr) Convert expr into a JSON STRING value
json_keys(expr, path) Get array of field names from expr at the specified JSONPath path, or null if the data does not exist or have any fields
json_paths(expr) Get array of all JSONPath paths available from expr

Rationale

I believe the utility of being able to store nested structure is obvious - besides flattenSpec and up front ETL being inflexible and complicated. As to why this implementation was chosen for the initial effort, it comes down to starting with what we know and mapping Druids current capability onto a nested structure. There is a lot of room for experimentation after this initial implementation is added, especially in the realm of storage format, as there are a wide variety of approaches to storing this type of data. The proposed implementation will have the same strengths and weaknesses as standard Druid queries, but with the initial implementation in place, we will have a point of comparison to conduct further investigation.

Operational impact

The expense of nested column ingestion is correlated with the complexity of the schema of the nested input data. The majority of the expense happens when serializing the segment (persist/merge), so these operations will take longer than normal for complex schemas, and could require additional heap and disk. Each nested literal field is roughly an additional column, and we're building them all at the end of the process on the fly while persisting the 'raw' data. Additionally, while I've gone through a few iterations so far, the current ingestion algorithm is still rather expensive and could use additional tuning, especially in regard to the number of temporary files involved.

Segments with nested data columns will likely be larger than normal, because the 'raw' data is retained. This data is compressed, but still takes up a decent amount of disk space. The good news is that since the literal values have their own nested columns, strictly using JSON_VALUE should prevent having to actually read these large 'raw' columns and prevent them from thrashing on the page cache. Future work will allow customization of what exactly is stored in nested columns, to give operators the chance to try to reduce these segment sizes.

Additionally, since this introduces a new column type, these columns will be unavailable when rolling back to older versions.

Test plan

The surface area of this feature is quite large, since it is effectively allowing the full functionality of segments within a single column and several ways of interacting with this data. JSON_VALUE in particular can be utilized as any other Druid column type across all query types (grouping, filtering, aggregation, etc). Quite a lot of testing has been done so far, including a bit of stress testing, and I've internally gone through a handful of iterations on the code, but work will need to continue on hardening the feature. Because the column format is versioned, we should be able to iterate freely without impacting existing data. Unit test coverage in my prototype is currently pretty decent, so the main focus of testing now will be in 'production'-ish use cases to observe how well things are performing and looking for incremental improvements.

Future work

ingestion performance improvements

This area needs some work to try to improve overall performance and optimize resource usage. For example, the usage of temporary files could be adjusted a bit more dynamically by project sizes and only splitting column components into separate internal files whenever necessary.

automatic typing for schema-less ingestion

The nested columns could be improved to make Druid schema-less ingestion support automatic type discovery. All discovered columns could be created with a nested data indexer, and at serialization time we could improve the persistence code to recognize single typed columns with only 'root' literal values and allow rewriting the type and writing out a standard Druid literal column. This primary work here would be allow this to work seamlessly with realtime queries, allowing the realtime selector to make instead a value selector on the root literal value instead of the 'raw' data selector.

literal arrays

While the current proposal can process and store array values, it does not include the ability to interact with them as native Druid ARRAY types and utilize the associated functions. Arrays of literal values could be stored with specialized nested columns (instead of a nested column for each array element),

JSONPath wildcards

Interaction with arrays could also be improved by introducing support for wildcards in our JSONPath syntax, to allow selecting an array of values instead of being limited to selecting specific array elements. This would make arrays significantly more useful.

better general array handling

Druid support for ARRAY types is growing, but still could use some improvement. In particular, an UNNEST function to allow turning an array of values into a column of values would unlock a lot of functionality when interacting with nested arrays.

better complex dimension handling, grouping, filtering, aggregation

Druid support for direct usage of COMPLEX types is still rather limited, and I want to work on improving this to make using nested data columns a more pleasant experience. This includes allowing direct grouping (the 'raw' values, like any variably sized type, could use a dictionary building strategy in the grouping engines). The filtering system could allow complex types to better participate in indexes and value matching. The current workaround is to use TO_JSON_STRING to stringify these values into a type that Druid can work with, but I think we can eliminate this need in the future.

formal Druid type instead of complex

It might be useful to consider switching from using generic COMPLEX types and promote the nested data type into a top level Druid type and call it something like OBJECT or STRUCT or ... something. This would allow various parts of the engine to take a more active stance on how nested types are handled, and allow tighter integration with various pieces. I'm not certain if this is strictly necessary at this point, just something I've been thinking about.

support for ingesting from other nested formats (Parquet, Avro, ORC)

The nested column implementation is not specific to JSON, so supporting other data formats would give us near full feature parity with the flattenSpec, allowing it to be deprecated.

customized control over ingestion (which fields to extract, which fields to index, retain raw data, etc)

Fine tuned control over how the nested data indexer produces columns would allow for retaining a larger blob of data but only extracting a specific set of columns to be 'optimized' to support use with JSON_VALUE and filtering with indexes, allowing the other columns to fall back to the 'raw' data. We could also allow omitting the 'raw' data, and instead opt to reconstruct it on the fly from the nested columns. Additionally, indexes might not be that useful on all nested columns, so control over which fields are indexed for fast filtering would be useful. All of these options would give operators a way to control the output size of nested columns.

bring technical enhancements to normal numeric columns

Nested numeric columns have both a numeric value column and a dictionary encoded column and bitmap indexes. This allows for both fast aggregation and fast filtering in exchange for additional storage space. These improvements can be folded into Druid LONG, DOUBLE, and FLOAT columns to allow operators to optionally specify creating indexes for numeric values.

alternative storage formats

There is a lot of room for exploration on alternative storage formats to suit various nested data use cases. For example, in cases where the structure is interesting and it is likely that a collection of nested fields will be taking part in the same query often, it might make sense to explore formats that allow compressing the values of these columns together into a single column (a fixed width row oriented format), allowing lower overhead to read multiple values in the same query (whether or not this is actually better would need proving). That said, I don't really have anything specific in mind in this area, just throwing it out there as an area of interest.

@FrankChen021
Copy link
Member

Hi @clintropolis , Introducing JSON type is a very big movement in an analytic database, and appreciate your const effort on this.

I get some personal opinions on the SQL function level that I want to share with you.

  1. When using json_value function to extract a value, this function does not know what the actual type of the value in advance and maybe there might be some problems. For example,
SELECT json_value(a_json_column, "a") / 5

In this example, a type of LONG or DOUBLE is expected for the returning of json_value function. But If one row accidentally stores the a in type of STRING(It's very common because some JSON serialization tools serialize LONG and DOUBLE as quoted format which will be recognised as string by some other deserialization tools), above expression fails to calculate.

Or for

SELECT json_value(a_json_column, "b") + json_value(a_json_column, "c")

does it perform a math calculation or string concatenation? It's not clear, only the writer of the SQL knows.

I think a better way is to make the json_value function more specific, such as json_value_long, json_value_double and json_value_string.

  1. I don't know if json_value and json_query are the final SQL function names in your implementation, but I think they're a little confusion for people when they first touch these functions. json_extract is a more widely used function name in other database like MySQL, SQLite, ClickHouse etc, I think we can use it so people are easy to know what these functions do.

So, I would like to propose the function names as: json_extract_long, json_extract_double, json_extract_string, json_extract_object.

  1. The final one is about the performance of application of JSONPath in above functions. There is a PR(Functions that extract a value from a JSON-encoded string. #11467) compares the JSONPath-based implementation and a hand-writing implementation which turns out the latter one has much better performance. I'm not saying this hand-writing implementation should be used but just want to give you a hint.

@clintropolis
Copy link
Member Author

Hi @FrankChen021 thanks for having a look, sorry I haven't finished filling out the PR description for #12753 yet, it might have been able to answer some of your questions.

When using json_value function to extract a value, this function does not know what the actual type of the value in advance and maybe there might be some problems.

For this, I have actually picked up a syntax that was implemented in the Calcite parser for these functions that allows specifying the type inline, looking something like

JSON_VALUE(x, '$.some.nested' RETURNING BIGINT)

The way I have implemented stuff during SQL conversion will also "eat" cast operations, so wrapping JSON_VALUE in a CAST will plan to the same native virtual column as the 'returning' syntax would, decorated with the expected type. I don't know how standard this part of the syntax is, but it came free from using these functions in Calcite.

Underlying JSON_VALUE is a thing called the NestedFieldVirtualColumn which can make optimized type selectors as asked for by the expected type, as well as supply the column indexes and everything else which provides 'native' druid performance.

I don't know if json_value and json_query are the final SQL function names in your implementation, but I think they're a little confusion for people when they first touch these functions. json_extract is a more widely used function name in other database like MySQL, SQLite, ClickHouse etc, I think we can use it so people are easy to know what these functions do.

I did some surveying as well and JSON_VALUE and JSON_QUERY are also relatively widely used, in fact it looks like even Clickhouse supports them (also big query, ms sql, oracle, etc). That said, it is relatively easy to add additional functions at the SQL layer, including optimized functions that plan into the NestedFieldVirtualColumn, you can see 2 of them still hanging out in my PR, GET_PATH and JSON_GET_PATH were my initial prototype functions and use 'jq' style paths instead of 'JSONPath', see NestedDataOperatorConversions

The final one is about the performance of application of JSONPath in above functions. There is a PR(#11467) compares the JSONPath-based implementation and a hand-writing implementation which turns out the latter one has much better performance. I'm not saying this hand-writing implementation should be used but just want to give you a hint.

Heh, not to worry, I actually also wrote my own JSONPath parser (also jq parser) that only handles the subset of functionality that we actually support for these columns see NestedPathFinder. I actually missed #11467, though I guess there is a bit of difference between what's going on in that PR, which looks like it has functions to handle stringified json inputs, vs what I'm proposing here, which is a new column type that decomposes the JSON at ingest time into indexed nested columns, but I'll have a look to see if there is any overlap. Thanks for pointing this out.

@FrankChen021
Copy link
Member

For #11467, if all your proposed functions are provided, there's no need to merge that PR. The use of your json_value and parse_json together will cover the proposed functions in that PR.

@achimbab
Copy link

@FrankChen021 @clintropolis
Good news. Although my pr(#11467) was not accepted, it is encouraging to give hints on performance issues. Great job.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants