Skip to content

[optimize] Optimize spark load/broker load reading parquet format file #3877

Open
@xy720

Description

@xy720

Currently, broker load support reading parquet file from remote, and soon we will use parquet file as be's loading source in spark load.

But due to the seperated metadata (file meta/column meta/page header...) location of parquet file, broker reader need frequently seek to get data, which leads to a lot of RPCs. And large amount of RPCs will lead to huge network costs in cross-data-center scene.

You can see a big gap of cost in the parquet reading below between non-cross-data-center and cross-data-center scene.

(The rpc times means how many times per broker seeks during loading)

spark load

cross-center rpc times load time data size
No 15014 60s 560m
Yes 16817 2h 560m
No 169766 8min 5.8G
Yes 150476 14h 5.8G

broker load

cross-center rpc times load time data size
No 51780 2min 250m
Yes 51413 42min 250m

As a proposal, I suggest to add a cache buffer array in broker reader reading process.

Illustration:
When a broker about to seek for a position and get data from remote parquet file, try reading with this position in the cache buffer array. Once the expected data hits the cache buffer array, then we don't bother to read data from remote parquet file.

Our final purpose is to reduce the number of rpc times as much as we could, so I make some testing to validate.

Test:
The test data I used is ssb lineorder, the target table is unpartitioned with 8 buckets.

Field Type Null Key Default Extra
lo_orderkey BIGINT Yes true N/A
lo_linenumber BIGINT Yes true N/A
lo_custkey INT Yes true N/A
lo_partkey INT Yes true N/A
lo_suppkey INT Yes true N/A
lo_orderdate INT Yes true N/A
lo_orderpriotity VARCHAR(16) Yes false N/A REPLACE
lo_shippriotity INT Yes false N/A SUM
lo_quantity BIGINT Yes false N/A SUM
lo_extendedprice BIGINT Yes false N/A SUM
lo_ordtotalprice BIGINT Yes false N/A SUM
lo_discount BIGINT Yes false N/A SUM
lo_revenue BIGINT Yes false N/A SUM
lo_supplycost BIGINT Yes false N/A SUM
lo_tax BIGINT Yes false N/A SUM
lo_commitdate BIGINT Yes false N/A SUM
lo_shipmode VARCHAR(11) Yes false N/A REPLACE

spark load

cross-center rpc times load time data size buffer size
Yes 16817 2h 560m N/A
Yes 613 3min 560m 128k
Yes 529 3min 560m 512k
Yes 285 3min 560m 1m
cross-center rpc times load time data size buffer size
No 15014 60s 560m N/A
No 634 33s 560m 128k
No 526 29s 560m 512k
No 423 27s 560m 1m
cross-center rpc times load time data size buffer size
Yes 150476 14h 5.8g N/A
Yes 6547 45min 5.8g 128k
Yes 5670 37min 5.8g 512k
Yes 5598 41min 5.8g 1m
cross-center rpc times load time data size buffer size
No 169766 8min 5.8g N/A
No 6773 5min 5.8g 128k
No 5667 4min 5.8g 512k
No 5879 5min 5.8g 1m

broker load

cross-center rpc times load time data size buffer size
Yes 51413 42min 250m N/A
Yes 1873 20min 250m 512k
cross-center rpc times load time data size buffer size
No 51780 2min 250m N/A
No 1798 55s 250m 512k

It can be seen that whether it is cross-data-center or not, adding the cache buffer array can reduce the number of RPC and improve the loading performance.

May be there still improve space in the Cache Policy, such as LRU-cache or someting else, we should try that gradually later.

Metadata

Metadata

Assignees

Labels

area/brokerIssues or PRs related to brokerarea/loadIssues or PRs related to all kinds of loadkind/improvementproposalCategorizes an issue is a proposal

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions