Skip to content

Commit

Permalink
[opt](scan) read scan ranges in the order of partitions (apache#33515)
Browse files Browse the repository at this point in the history
Follow apache#33410. scan_ranges are already sorted by path(as well as partition path) in FE, so merge scan ranges in order, not round robin.
In the insert statement, reading data in partition order can reduce the memory usage of BE and prevent the generation of smaller tables.
  • Loading branch information
AshinGau authored Apr 15, 2024
1 parent 5137342 commit f5739b7
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 41 deletions.
36 changes: 25 additions & 11 deletions be/src/pipeline/exec/file_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,34 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
_scan_ranges = scan_ranges;
} else {
// There is no need for the number of scanners to exceed the number of threads in thread pool.
_scan_ranges.clear();
auto range_iter = scan_ranges.begin();
for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) {
_scan_ranges.push_back(*range_iter);
// scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order.
// In the insert statement, reading data in partition order can reduce the memory usage of BE
// and prevent the generation of smaller tables.
_scan_ranges.resize(max_scanners);
int num_ranges = scan_ranges.size() / max_scanners;
int num_add_one = scan_ranges.size() - num_ranges * max_scanners;
int scan_index = 0;
int range_index = 0;
for (int i = 0; i < num_add_one; ++i) {
_scan_ranges[scan_index] = scan_ranges[range_index++];
auto& ranges =
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
for (int j = 0; j < num_ranges; j++) {
auto& merged_ranges =
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
}
}
for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
if (i == max_scanners) {
i = 0;
for (int i = num_add_one; i < max_scanners; ++i) {
_scan_ranges[scan_index] = scan_ranges[range_index++];
auto& ranges =
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
for (int j = 0; j < num_ranges - 1; j++) {
auto& merged_ranges =
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
}
auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
}
_scan_ranges.shrink_to_fit();
LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size();
}
if (scan_ranges.size() > 0 &&
Expand Down
36 changes: 25 additions & 11 deletions be/src/vec/exec/scan/new_file_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,34 @@ void NewFileScanNode::set_scan_ranges(RuntimeState* state,
_scan_ranges = scan_ranges;
} else {
// There is no need for the number of scanners to exceed the number of threads in thread pool.
_scan_ranges.clear();
auto range_iter = scan_ranges.begin();
for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) {
_scan_ranges.push_back(*range_iter);
// scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order.
// In the insert statement, reading data in partition order can reduce the memory usage of BE
// and prevent the generation of smaller tables.
_scan_ranges.resize(max_scanners);
int num_ranges = scan_ranges.size() / max_scanners;
int num_add_one = scan_ranges.size() - num_ranges * max_scanners;
int scan_index = 0;
int range_index = 0;
for (int i = 0; i < num_add_one; ++i) {
_scan_ranges[scan_index] = scan_ranges[range_index++];
auto& ranges =
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
for (int j = 0; j < num_ranges; j++) {
auto& merged_ranges =
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
}
}
for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
if (i == max_scanners) {
i = 0;
for (int i = num_add_one; i < max_scanners; ++i) {
_scan_ranges[scan_index] = scan_ranges[range_index++];
auto& ranges =
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
for (int j = 0; j < num_ranges - 1; j++) {
auto& merged_ranges =
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
}
auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
}
_scan_ranges.shrink_to_fit();
LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size();
}
if (scan_ranges.size() > 0 &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,24 +211,18 @@ public void setEnableSplitsRedistribution(boolean enableSplitsRedistribution) {
this.enableSplitsRedistribution = enableSplitsRedistribution;
}

/**
* Assign splits to each backend. Ensure that each backend receives a similar amount of data.
* In order to make sure backends utilize the os page cache as much as possible, and all backends read splits
* in the order of partitions(reading data in partition order can reduce the memory usage of backends),
* splits should be sorted by path.
* Fortunately, the process of obtaining splits ensures that the splits have been sorted according to the path.
* If the splits are unordered, it is strongly recommended to sort them before calling this function.
*/
public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> splits) throws UserException {
// Sorting splits is to ensure that the same query utilizes the os page cache as much as possible.
splits.sort((split1, split2) -> {
int pathComparison = split1.getPathString().compareTo(split2.getPathString());
if (pathComparison != 0) {
return pathComparison;
}

int startComparison = Long.compare(split1.getStart(), split2.getStart());
if (startComparison != 0) {
return startComparison;
}
return Long.compare(split1.getLength(), split2.getLength());
});

ListMultimap<Backend, Split> assignment = ArrayListMultimap.create();

List<Split> remainingSplits = null;
List<Split> remainingSplits;

List<Backend> backends = new ArrayList<>();
for (List<Backend> backendList : backendMap.values()) {
Expand All @@ -242,8 +236,7 @@ public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> splits) t
// locality information
if (Config.split_assigner_optimized_local_scheduling) {
remainingSplits = new ArrayList<>(splits.size());
for (int i = 0; i < splits.size(); ++i) {
Split split = splits.get(i);
for (Split split : splits) {
if (split.isRemotelyAccessible() && (split.getHosts() != null && split.getHosts().length > 0)) {
List<Backend> candidateNodes = selectExactNodes(backendMap, split.getHosts());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,21 @@ public SystemInfoService getCurrentSystemInfo() {

}

public static void sortSplits(List<Split> splits) {
splits.sort((split1, split2) -> {
int pathComparison = split1.getPathString().compareTo(split2.getPathString());
if (pathComparison != 0) {
return pathComparison;
}

int startComparison = Long.compare(split1.getStart(), split2.getStart());
if (startComparison != 0) {
return startComparison;
}
return Long.compare(split1.getLength(), split2.getLength());
});
}

@Test
public void testGenerateRandomly() throws UserException {
SystemInfoService service = new SystemInfoService();
Expand Down Expand Up @@ -367,7 +382,7 @@ public SystemInfoService getCurrentSystemInfo() {
List<Split> totalSplits = new ArrayList<>();
totalSplits.addAll(remoteSplits);
totalSplits.addAll(localSplits);
Collections.shuffle(totalSplits);
sortSplits(totalSplits);
Multimap<Backend, Split> assignment = policy.computeScanRangeAssignment(totalSplits);
if (i == 0) {
result = ArrayListMultimap.create(assignment);
Expand Down Expand Up @@ -489,7 +504,7 @@ public SystemInfoService getCurrentSystemInfo() {
List<Split> totalSplits = new ArrayList<>();
totalSplits.addAll(remoteSplits);
totalSplits.addAll(localSplits);
Collections.shuffle(totalSplits);
sortSplits(totalSplits);
Multimap<Backend, Split> assignment = policy.computeScanRangeAssignment(totalSplits);
if (i == 0) {
result = ArrayListMultimap.create(assignment);
Expand Down

0 comments on commit f5739b7

Please sign in to comment.