Description
Currently, broker load support reading parquet file from remote, and soon we will use parquet file as be's loading source in spark load.
But due to the seperated metadata (file meta/column meta/page header...) location of parquet file, broker reader need frequently seek to get data, which leads to a lot of RPCs. And large amount of RPCs will lead to huge network costs in cross-data-center scene.
You can see a big gap of cost in the parquet reading below between non-cross-data-center and cross-data-center scene.
(The rpc times means how many times per broker seeks during loading)
spark load
cross-center | rpc times | load time | data size |
---|---|---|---|
No | 15014 | 60s | 560m |
Yes | 16817 | 2h | 560m |
No | 169766 | 8min | 5.8G |
Yes | 150476 | 14h | 5.8G |
broker load
cross-center | rpc times | load time | data size |
---|---|---|---|
No | 51780 | 2min | 250m |
Yes | 51413 | 42min | 250m |
As a proposal, I suggest to add a cache buffer array in broker reader reading process.
Illustration:
When a broker about to seek for a position and get data from remote parquet file, try reading with this position in the cache buffer array. Once the expected data hits the cache buffer array, then we don't bother to read data from remote parquet file.
Our final purpose is to reduce the number of rpc times as much as we could, so I make some testing to validate.
Test:
The test data I used is ssb lineorder, the target table is unpartitioned with 8 buckets.
Field | Type | Null | Key | Default | Extra |
---|---|---|---|---|---|
lo_orderkey | BIGINT | Yes | true | N/A | |
lo_linenumber | BIGINT | Yes | true | N/A | |
lo_custkey | INT | Yes | true | N/A | |
lo_partkey | INT | Yes | true | N/A | |
lo_suppkey | INT | Yes | true | N/A | |
lo_orderdate | INT | Yes | true | N/A | |
lo_orderpriotity | VARCHAR(16) | Yes | false | N/A | REPLACE |
lo_shippriotity | INT | Yes | false | N/A | SUM |
lo_quantity | BIGINT | Yes | false | N/A | SUM |
lo_extendedprice | BIGINT | Yes | false | N/A | SUM |
lo_ordtotalprice | BIGINT | Yes | false | N/A | SUM |
lo_discount | BIGINT | Yes | false | N/A | SUM |
lo_revenue | BIGINT | Yes | false | N/A | SUM |
lo_supplycost | BIGINT | Yes | false | N/A | SUM |
lo_tax | BIGINT | Yes | false | N/A | SUM |
lo_commitdate | BIGINT | Yes | false | N/A | SUM |
lo_shipmode | VARCHAR(11) | Yes | false | N/A | REPLACE |
spark load
cross-center | rpc times | load time | data size | buffer size |
---|---|---|---|---|
Yes | 16817 | 2h | 560m | N/A |
Yes | 613 | 3min | 560m | 128k |
Yes | 529 | 3min | 560m | 512k |
Yes | 285 | 3min | 560m | 1m |
cross-center | rpc times | load time | data size | buffer size |
---|---|---|---|---|
No | 15014 | 60s | 560m | N/A |
No | 634 | 33s | 560m | 128k |
No | 526 | 29s | 560m | 512k |
No | 423 | 27s | 560m | 1m |
cross-center | rpc times | load time | data size | buffer size |
---|---|---|---|---|
Yes | 150476 | 14h | 5.8g | N/A |
Yes | 6547 | 45min | 5.8g | 128k |
Yes | 5670 | 37min | 5.8g | 512k |
Yes | 5598 | 41min | 5.8g | 1m |
cross-center | rpc times | load time | data size | buffer size |
---|---|---|---|---|
No | 169766 | 8min | 5.8g | N/A |
No | 6773 | 5min | 5.8g | 128k |
No | 5667 | 4min | 5.8g | 512k |
No | 5879 | 5min | 5.8g | 1m |
broker load
cross-center | rpc times | load time | data size | buffer size |
---|---|---|---|---|
Yes | 51413 | 42min | 250m | N/A |
Yes | 1873 | 20min | 250m | 512k |
cross-center | rpc times | load time | data size | buffer size |
---|---|---|---|---|
No | 51780 | 2min | 250m | N/A |
No | 1798 | 55s | 250m | 512k |
It can be seen that whether it is cross-data-center or not, adding the cache buffer array can reduce the number of RPC and improve the loading performance.
May be there still improve space in the Cache Policy, such as LRU-cache or someting else, we should try that gradually later.