Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment processing framework #5934

Merged
merged 12 commits into from
Sep 15, 2020
Merged

Conversation

npawar
Copy link
Contributor

@npawar npawar commented Aug 27, 2020

Description

#5753
A Segment Processing Framework to convert "m" segments to "n" segments
The phases of the Segment Processor are

  1. Map
  • RecordTransformation (using transform functions)
  • Record filtering (using filter functions)
  • Partitioning (Column value based, transform function based, table config's partition config based)
  1. Reduce
  • Rollup/Concat records
  • Split into parts
  • Sort
  1. Segment generation

A SegmentProcessorFrameworkCommand is provided to run this on demand. Run using command
bin/pinot-admin.sh SegmentProcessorFramework -segmentProcessorFrameworkSpec /<path>/spec.json
where spec.json is

{
  "inputSegmentsDir": "/<base_dir>/segmentsDir",
  "outputSegmentsDir": "/<base_dir>/outputDir/",
  "schemaFile": "/<base_dir>/schema.json",
  "tableConfigFile": "/<base_dir>/table.json",
  "recordTransformerConfig": {
    "transformFunctionsMap": {
      "epochMillis": "round(epochMillis, 86400000)" // round to nearest day
    }
  },
  "recordFilterConfig": {
    "recordFilterType": "FILTER_FUNCTION",
    "filterFunction": "Groovy({epochMillis != \"1597795200000\"}, epochMillis)"
  },
  "partitioningConfig": {
    "partitionerType": "COLUMN_VALUE", // partition on epochMillis
    "columnName": "epochMillis"
  },
  "collectorConfig": {
    "collectorType": "ROLLUP", // rollup clicks by summing
    "aggregatorTypeMap": {
      "clicks": "SUM"
    }
  },
  "segmentConfig": {
    "maxNumRecordsPerSegment": 200_000
  }
}

Note:

  1. Currently this framework attempts to do no parallelism in the map/reduce/segment creation jobs. Each input file will be processed sequentially in map stage, each part will be executed sequentially in reduce, and each segment will be built one after another. We can change this in the future if the need arises to make this more advanced.
  2. The framework makes the assumption that there's enough memory to hold all records of a partition in memory, during rollups in reducer. A limit of 5M records has been set on the Reducer as the number of records to collect before forcing a flush, as a safety measure. In future we could consider using off heap processing, if memory becomes a problem.

This framework will typically be used by minion tasks, which want to perform some processing on segments
(eg task which merges segments, tasks which aligns segments per time boundaries etc). The existing Segment merge jobs can be changed to use this framework.

Pending
Enhancements like (Added TODOs in code)

  • Put null in GenericRecord if nullValueFields contains the field
  • Interface out underlying file format (currently avro)
  • Dedup
  • Using off-heap based implementation for aggregation/sorting in the reduce
  • 2 step partitioner 1) Apply custom partitioner 2) Apply table config partitioner. Combine both to get final partition.
  • Configs for segment name (like prefix)

@npawar npawar requested review from Jackie-Jiang and snleee August 27, 2020 22:31
@npawar npawar force-pushed the segment_processing_framework branch from 80e1c4d to eece981 Compare August 27, 2020 22:39
@codecov-commenter
Copy link

Codecov Report

Merging #5934 into master will decrease coverage by 23.23%.
The diff coverage is 51.23%.

Impacted file tree graph

@@             Coverage Diff             @@
##           master    #5934       +/-   ##
===========================================
- Coverage   66.44%   43.20%   -23.24%     
===========================================
  Files        1075     1210      +135     
  Lines       54773    62540     +7767     
  Branches     8168     9529     +1361     
===========================================
- Hits        36396    27023     -9373     
- Misses      15700    33081    +17381     
+ Partials     2677     2436      -241     
Flag Coverage Δ
#integration 43.20% <51.23%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...ot/broker/broker/AllowAllAccessControlFactory.java 100.00% <ø> (ø)
.../helix/BrokerUserDefinedMessageHandlerFactory.java 52.83% <0.00%> (-13.84%) ⬇️
...ava/org/apache/pinot/client/AbstractResultSet.java 26.66% <0.00%> (-30.48%) ⬇️
.../main/java/org/apache/pinot/client/Connection.java 22.22% <0.00%> (-26.62%) ⬇️
.../org/apache/pinot/client/ResultTableResultSet.java 24.00% <0.00%> (-10.29%) ⬇️
.../org/apache/pinot/common/lineage/LineageEntry.java 0.00% <0.00%> (ø)
...apache/pinot/common/lineage/LineageEntryState.java 0.00% <0.00%> (ø)
...rg/apache/pinot/common/lineage/SegmentLineage.java 0.00% <0.00%> (ø)
...ache/pinot/common/lineage/SegmentLineageUtils.java 0.00% <0.00%> (ø)
...ot/common/messages/RoutingTableRebuildMessage.java 0.00% <0.00%> (ø)
... and 1132 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 4fd70fe...eece981. Read the comment docs.

Copy link
Contributor

@snleee snleee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some comments for high-level discussion

@npawar npawar force-pushed the segment_processing_framework branch from 2c642d6 to eece981 Compare September 1, 2020 17:58
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job splitting the work into multiple modules and make it very easy to extend. Mostly minor comments.
One high level comment: since we always use Avro as the intermediate format, should we directly work on GenericRecord instead of converting back and forth between GenericRow and GenericRecord?
Also, we might want to support more input formats other than Pinot segments. We can do it as the next step.

Copy link
Contributor

@snleee snleee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I like that all the core components are interfaced out and easy to extend. I have put some comments. Some of them are questions or points that I would like to discuss.

@npawar
Copy link
Contributor Author

npawar commented Sep 15, 2020

Overall, I like that all the core components are interfaced out and easy to extend. I have put some comments. Some of them are questions or points that I would like to discuss.

Addressed the comments. Added TODOs in code and description for those that will be handled in future PRs

Copy link
Contributor

@snleee snleee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thank you for addressing all the comments!

@npawar npawar merged commit 41de9a6 into apache:master Sep 15, 2020
@npawar npawar deleted the segment_processing_framework branch September 15, 2020 16:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants