Skip to content

push JSON Path evaluation down to storage layer#7820

Merged
Jackie-Jiang merged 1 commit intoapache:masterfrom
richardstartin:jsonpath-pushdown
Dec 6, 2021
Merged

push JSON Path evaluation down to storage layer#7820
Jackie-Jiang merged 1 commit intoapache:masterfrom
richardstartin:jsonpath-pushdown

Conversation

@richardstartin
Copy link
Member

@richardstartin richardstartin commented Nov 23, 2021

Pushes JSON path evaluation down to the storage layer (giving direct access to dictionaries and forward index) which avoids various intermediate materialisations of strings, byte arrays and so on. The benefit to users is the potential to avoid a lot of allocation of large byte[] and String once the JsonPath library can accept UTF-8 encoded byte[]. This also creates an SPI to make the evaluation logic pluggable. The same pushdown mechanism could be abstracted to make extensible to regular expressions etc. in the future.

@richardstartin richardstartin force-pushed the jsonpath-pushdown branch 3 times, most recently from 6cba1f3 to 7a24e82 Compare November 23, 2021 22:33
@codecov-commenter
Copy link

codecov-commenter commented Nov 23, 2021

Codecov Report

Merging #7820 (fa0219f) into master (e572ea0) will decrease coverage by 1.39%.
The diff coverage is 39.92%.

❗ Current head fa0219f differs from pull request most recent head a89359a. Consider uploading reports for the commit a89359a to get more accurate results
Impacted file tree graph

@@             Coverage Diff              @@
##             master    #7820      +/-   ##
============================================
- Coverage     71.65%   70.26%   -1.40%     
- Complexity     4080     4084       +4     
============================================
  Files          1581     1583       +2     
  Lines         81350    81849     +499     
  Branches      12128    12237     +109     
============================================
- Hits          58293    57509     -784     
- Misses        19117    20397    +1280     
- Partials       3940     3943       +3     
Flag Coverage Δ
integration1 ?
integration2 27.69% <11.32%> (-0.11%) ⬇️
unittests1 68.52% <39.92%> (-0.29%) ⬇️
unittests2 14.44% <0.00%> (-0.08%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...re/common/evaluators/DefaultJsonPathEvaluator.java 31.44% <31.44%> (ø)
...m/function/JsonExtractScalarTransformFunction.java 22.49% <35.64%> (-27.04%) ⬇️
...a/org/apache/pinot/core/common/DataBlockCache.java 91.44% <60.00%> (-4.77%) ⬇️
...he/pinot/core/operator/blocks/ProjectionBlock.java 60.00% <60.00%> (ø)
...ransform/function/IdentifierTransformFunction.java 69.76% <60.00%> (-8.50%) ⬇️
...java/org/apache/pinot/core/common/DataFetcher.java 85.71% <60.97%> (-4.25%) ⬇️
...segment/spi/evaluator/json/JsonPathEvaluators.java 61.11% <61.11%> (ø)
...pinot/minion/exception/TaskCancelledException.java 0.00% <0.00%> (-100.00%) ⬇️
...nverttorawindex/ConvertToRawIndexTaskExecutor.java 0.00% <0.00%> (-100.00%) ⬇️
...e/pinot/common/minion/MergeRollupTaskMetadata.java 0.00% <0.00%> (-94.74%) ⬇️
... and 105 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update e572ea0...a89359a. Read the comment docs.

@richardstartin
Copy link
Member Author

The try/catches can be removed if json-path/JsonPath#767 is merged and released.

@richardstartin richardstartin force-pushed the jsonpath-pushdown branch 4 times, most recently from e362ace to 2b630fc Compare November 26, 2021 12:27
@kishoreg
Copy link
Member

I like the concept of the pushdown evaluator. Few thoughts/concerns

  • Some methods added to projectionBlock seem to be specific to projectionBlock and is deviating from the block concept of chaining any blocks together i.e. the caller of the block does not understand the implementation of the underlying block. We have violated this in few places and we should fix that. It might be a good idea to enhance the Block interface to allow a pushdown evaluator.
  • Conceptually, we are moving from a Column to Column + path but modelling the path as an evaluator. But the query layer is deciding the evaluator instead of the storage layer.
  • The evaluator seems to be set at the JVM level but in practice, it's possible for each table to have a different evaluator and in fact, it's possible that each segment can have a different evaluator.

At a high level, the segment is the smallest unit of query processing in Pinot and most of the things can be decided independently at that scope - segmentDirectory, readers, writers, Blocks, indexes, encoding etc.

Having said that, I don't have a very good solution yet but I think we can come up with something after a couple of iterations. One thought is enhancing the concept (String Column) to a Column Object which has SingleColumn, NestedColumn( column + path) as implementations.

Overall, I think the Storage (Reader) should decide on the evaluator instead of the query layer.

@richardstartin
Copy link
Member Author

richardstartin commented Nov 29, 2021

Conceptually, we are moving from a Column to Column + path but modelling the path as an evaluator. But the query layer is deciding the evaluator instead of the storage layer.

The evaluator seems to be set at the JVM level but in practice, it's possible for each table to have a different evaluator and in fact, it's possible that each segment can have a different evaluator.

The query layer does not decide, the query layer just produces the evaluator from the query. It does not know what evaluator implementation it is producing, just whatever has been registered. It just won’t push the evaluator down unless the identifier is a column identifier rather than an expression.

The evaluator, having captured information from the query, is pushed down to the storage layer. The storage layer then presents storage layer capabilities (I.e. dictionary + forward index) to the evaluator, which can interrogate and choose what to do with them.

Think of the evaluator as a mediator between the query and storage layers for which any implementation can be registered via SPI. It encapsulates the logic of the function, and needs to know all relevant storage capabilities present in the deployment. The right evaluator to register for a given Pinot deployment is the one that matches all storage capabilities in the deployment.

The evaluator must decide how to evaluate its function based on what storage capabilities it is presented with in evaluateBlock. If evaluateBlock is presented with, say, a BytesDictionary, it must be able handle extracting JSON encoded as bytes from the dictionary and evaluating a JSON path against it, just as it must be able to handle reading JSON from a raw forward index, or from whatever ForwardIndexReader from whatever storage plugin it is presented with.

To illustrate the evaluator’s responsibility to understand the storage, the DefaultJsonPathEvaluator in this PR knows how to work with all storage capabilities for storing JSON currently present in Pinot (dictionarised string, bytes, raw string, bytes, JSON type column or not) but adding a storage plugin requires creating a new evaluator which understands the custom storage but delegates to a default evaluator for everything else, and this evaluator must be registered via the SPI.

This has the ergonomic benefit of being able to implement storage plugins which deviate from the ForwardIndexReader interface. Completely new access patterns and the necessary interfaces can be implemented without changing the ForwardIndexReader interface:

  1. Implement new ForwardIndexReader with a previously inconceivable API
  2. Implement an evaluator which uses the new API for instances of that ForwardIndexReader, otherwise delegate to default implementation
  3. Register the evaluator
  4. (Optional) later enhance the ForwardIndexReader interface with method supporting a tried and proven access pattern, explain to everybody why it’s a good idea with data to refer to

This circumnavigates the need to form design consensus before having a proven storage implementation and should reduce the number of changes to the ForwardIndexReader interface to those which have already been proven to make sense, while never touching the query layer once the relevant TransformFunction has been modified to push an evaluator down.

@richardstartin
Copy link
Member Author

Conceptually, we are moving from a Column to Column + path but modelling the path as an evaluator. But the query layer is deciding the evaluator instead of the storage layer.

Specifically on this, not really, the model has the potential to be far more generic than that. It’s more going from column to column + parameters + arbitrary sequence of storage layer API calls.

This is good, because within the context of an evaluator, the most efficient API calls can be made. Case in point: in DefaultJsonPathEvaluator it’s possible to avoid converting byte[] to String. It would also be possible to use hypothetical new zero copy (but unsafe) methods to read bytes on the basis that we don’t need to create a vector of JSON records, but a vector of JSON evaluation results. Individual evaluator implementations can be arbitrarily efficient (and complex) in a way that prevents the query layer from ever seeing any of this complexity, keeping the query layer clean just as it avoids trying to model all of the complex access patterns we might like in the storage layer with a single interface.

@richardstartin
Copy link
Member Author

@kishoreg please see the updated Javadoc which provides more colour.

@siddharthteotia you expressed some interest in this PR, would you mind taking a look please?

@richardstartin
Copy link
Member Author

Some methods added to projectionBlock seem to be specific to projectionBlock and is deviating from the block concept of chaining any blocks together i.e. the caller of the block does not understand the implementation of the underlying block. We have violated this in few places and we should fix that. It might be a good idea to enhance the Block interface to allow a pushdown evaluator.

@kishoreg this has been deviated from significantly already, and in many Block implementations there would be no sensible implementation but to throw UnsupportedOperationException which breaks LSP anyway.

I also want to call out that there appears to be an element of complexity which you may have missed: TransformFunction isn't guaranteed to operate on directly on storage, it may also operate on the output of some other JSON function (I added a test for this) - when that happens, there needs to be an implementation which can operate on values produced by another TransformFunction. This (to push down or evaluate directly) is a decision which can only be made in the query layer. I'm not sure if you didn't mention this because it makes sense already or not.

@richardstartin richardstartin force-pushed the jsonpath-pushdown branch 2 times, most recently from e343b41 to db04a8c Compare November 30, 2021 12:57
@richardstartin richardstartin force-pushed the jsonpath-pushdown branch 2 times, most recently from 220c16c to 909cd9c Compare December 1, 2021 17:04
@kishoreg
Copy link
Member

kishoreg commented Dec 1, 2021

Thanks @richardstartin. I did pull the branch and reviewed the code. Given the state of the current code, I think what you have done is probably the least invasive change. I will list my wishful state to get to here

public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends Closeable {

  boolean isNested();

  boolean isStructured();

  List<String> getNestedFields();

  ForwardIndexReader<T> getReader(String nestedField); or 
  ForwardIndexReader<T> getReader(Extractor/Evaluator e); or
  ForwardIndexReader<T> getReader(Function pushDownFunction); or
  ForwardIndexReader<T> getReader(Selector selector); or
}

But passing parameters down the Block abstraction is non-trivial given the state of the current code.

One request from my side would be to add javadoc the newly added methods with the evaluator as Beta that we will mark it stable once we get everything working end to end.

Thanks again for taking the time to explain

@richardstartin
Copy link
Member Author

@kishoreg Specifying in Javadoc that these are unstable APIs is an excellent compromise. I think it should be possible to evolve towards what you want. I must admit that I don't see it right now, but I'm trying hard to find a way both to implement the suggestion and am thinking about the benefits. I want to make it very easy to implement storage plugins (as an idea incubation mechanism) without needing community consensus on the shape of SPIs like ForwardIndexReader which will affect a lot of stakeholders. A delegation chain of TransformEvaluators, marked as an unstable interface, feels like a good way to iterate without stalling for consensus or imposing churn on stakeholders.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the data fetch has the buffers maintained within the BlockValSet. The benefit of doing so is that buffer for the same expression can be shared across different functions (through DataBlockCache). The disadvantage is that we need to keep the buffers for all unique expressions in memory, and map lookup is required to read the shared buffer.

In this PR, the buffer is maintained within the transform function, and directly pushed down to the forward index level to fill the values. It can be more efficient as the values are directly filled into the buffer, but values for the same expression in different functions need to be read multiple times (cache is bypassed).

Both ways have their own pros and cons, but we should try to stick with on of them, or it can be quite confusing, and we have to maintain 2 sets of different read apis.

To make the evaluator push down using the first approach, we can potentially extend public BlockValSet getBlockValueSet(String column) to public BlockValSet getBlockValueSet(String column, @Nullable TransformEvaluator evaluator), and embed the evaluator into the DataBlockCache.

Or if we found the first approach is adding unnecessary overhead, we can remove the cache and always push down the buffer.

IMO I can see several use cases benefitting from the cache, and moving to the second approach might cause regression.

@richardstartin
Copy link
Member Author

@Jackie-Jiang the reason I bypassed the cache is because the cache would need a composite key to avoid different JSON path transformations sharing results spuriously. It’s not a tangential optimisation to get rid of a few buffers or array copies. Could this be handled differently without the cache by reordering the execution graph to deduplicate common subexpressions? From my perspective, it doesn’t feel like a huge departure to allow some expressions to bypass the cache, and I don’t think it should be necessary for all expressions to bypass the cache if some do.

@richardstartin
Copy link
Member Author

@Jackie-Jiang to demonstrate an open mind, I've implemented your suggestion here: f751358

I'm not convinced this is necessarily cleaner (ProjectionBlock is though) and I think it was better to keep the "tunnel" for evaluators separate from the regular path with caching. Writing the array copies from the enormous array maintained by DataBlockCache (because it doesn't have enough information to size the result properly) to the correctly sized arrays in JsonExtractScalarFunction made me want to cry ever so slightly.

Please pay attention to the changes to the cache key in DataBlockCache - JSONPath evaluations need to be discriminated by both the JSON Path itself and the default value, on top of the column name and type of the result. This is probably better than having large JSON documents lingering in the cache, but I'm struggling to see this cache as an unequivocal benefit for every possible transformation.

Could we avoid caching JSON path evaluations at the block level and revert the commit? If we find that we spend a lot of time on duplicate JSON path evaluations, could we think about avoiding the planning of duplicate evaluations instead?

@richardstartin richardstartin force-pushed the jsonpath-pushdown branch 2 times, most recently from da1dddf to 4096fa7 Compare December 2, 2021 13:28
@Jackie-Jiang
Copy link
Contributor

Could this be handled differently without the cache by reordering the execution graph to deduplicate common subexpressions?

I agree. Ideally we should deduplicate the expression during the query planning, and have only one transform function for each unique expression. Then we can remove the cache and avoid all the unnecessary overhead.

Towards this direction, I agree we should maintain the buffer within the transform layer and push the buffer down through the data fetcher. We should think of a smooth way to change the interface and move to the end state where all the transform functions (with or without an evaluator) can pass down the buffer to fill the values.

For now, we might just add some read methods into the ProjectionBlock so that it can be used by the TransformFunction? Adding these methods to the Block interface might not match our end goal.

Do you see a path how we can move to the end state without making temporary changes to the interfaces?

@richardstartin
Copy link
Member Author

For now, we might just add some read methods into the ProjectionBlock so that it can be used by the TransformFunction? Adding these methods to the Block interface might not match our end goal.

This is what the PR originally did, I did not modify the Block interface and added methods to the concrete ProjectionBlock, but it was requested above to ensure substitutability of different block types. It feels like I am violating an aspirational design principle with this PR, but as far as I can tell, it has been violated for a long time. A lot of good things will follow on from the ability to tunnel down to the storage, so I would like to find a way to progress here.

@richardstartin
Copy link
Member Author

@Jackie-Jiang please see 83dc232 - it removes pushDown from the Block interface.

@yupeng9
Copy link
Contributor

yupeng9 commented Dec 3, 2021

Curious, do you have some benchmark number to show how much perf improvement?

@richardstartin
Copy link
Member Author

Curious, do you have some benchmark number to show how much perf improvement?

Performance improvement isn't the primary motivation but to provide a plugin mechanism which allows query logic to depend on the available storage. Before merging I would, of course, provide evidence that this does not regress performance.

@kishoreg
Copy link
Member

kishoreg commented Dec 3, 2021

@Jackie-Jiang let's try to get to the final version in a few iterations. I think it's better to keep the changes restricted to projection block for now. Let's get this to work and we can try to revisit the block interface in a different PR.

I am +1 on this PR assuming we will mark the methods in projectionblock as evolving

@richardstartin
Copy link
Member Author

I'll update the Javadoc but I told @siddharthteotia that this would not be merged until he's had a chance to review it.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly good. I think this is the cleanest solution without changing the existing data fetch mechanisms

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't fully understand the logic here. What is the delegate evaluator used for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is there is a chain of responsibility. There is the standard way of evaluating JSON paths (the result of the refactoring here), but a plugin could be registered which adds special storage as well as an evaluator which can interpret that storage. There could be a mix of segments using the default storage and the plugin storage. On a case by case basis, the registered plugin either handles the encountered storage or delegates if it doesn't know how.

I'm not sure what I was thinking when I wrote this particular line though - DefaultJsonPathEvaluator is the end of the chain and will never delegate, so its delegate will always be null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These interfaces do not belong to the segment-spi, but query-spi which is not available yet. For now we can put them into the pinot-core which contains all the query related interfaces

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can they stay here for now? They need to be implemented by a plugin which ideally would not depend on pinot-core.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it work if we simply put the provider as a private static field and set it during the instance startup? Several classes handle the plugin that way

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is better for a couple of reasons

  1. double registration becomes obvious
  2. various runtime benefits of the provider being final

@siddharthteotia
Copy link
Contributor

I'll update the Javadoc but I told @siddharthteotia that this would not be merged until he's had a chance to review it.

@richardstartin, thanks for waiting. I pulled the changes locally and understood them. Had a few minor questions which I met with @Jackie-Jiang to clarify. LGTM

siddharthteotia
siddharthteotia approved these changes Dec 4, 2021
@Jackie-Jiang Jackie-Jiang merged commit c999a23 into apache:master Dec 6, 2021
kriti-sc pushed a commit to kriti-sc/incubator-pinot that referenced this pull request Dec 12, 2021
Pushes JSON path evaluation down to the storage layer (giving direct access to dictionaries and forward index) which avoids various intermediate materialisations of strings, byte arrays and so on. The benefit to users is the potential to avoid a lot of allocation of large byte[] and String once the JsonPath library can accept UTF-8 encoded byte[]. This also creates an SPI to make the evaluation logic pluggable. The same pushdown mechanism could be abstracted to make extensible to regular expressions etc. in the future.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants