Skip to content

Commit

Permalink
[BrokerLoad] Improve the file assignment strategy of scan range locat…
Browse files Browse the repository at this point in the history
…ions (#185)

Current strategy:
Create a TScanRangeLocations to fill up, and then create the next
TScanRangeLocations. Because the parquet orc format cannot be
divided, each file can only be assigned to one TScanRangeLocations,
which will cause the final instance number to be less than planned,
and the last TScanRangeLocations will have a relatively small amount
of data.

Improved strategy:
First create n locations, and then add files to locations one by one.
When assigning locations for each file, select the locations with
the smallest allocated data each time.
  • Loading branch information
wyb authored Sep 15, 2021
1 parent 664c648 commit 5b39560
Show file tree
Hide file tree
Showing 2 changed files with 356 additions and 41 deletions.
96 changes: 55 additions & 41 deletions fe/fe-core/src/main/java/com/starrocks/planner/FileScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.starrocks.catalog.Type;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.Config;
import com.starrocks.common.Pair;
import com.starrocks.common.UserException;
import com.starrocks.common.util.BrokerUtil;
import com.starrocks.load.BrokerFileGroup;
Expand Down Expand Up @@ -74,6 +75,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -96,14 +98,16 @@ public int compare(TBrokerFileStatus o1, TBrokerFileStatus o2) {
}
}

private static final Comparator SCAN_RANGE_LOCATIONS_COMPARATOR =
(Comparator<Pair<TScanRangeLocations, Long>>) (o1, o2) -> Long.compare(o1.second, o2.second);

private final Random random = new Random(System.currentTimeMillis());

// File groups need to
private List<TScanRangeLocations> locationsList;
private PriorityQueue<Pair<TScanRangeLocations, Long>> locationsHeap;

// used both for load statement and select statement
private long totalBytes;
private int numInstances;
private int parallelInstanceNum;
private long bytesPerInstance;

Expand Down Expand Up @@ -421,7 +425,7 @@ private void getFileStatusAndCalcInstance() throws UserException {
throw new UserException("No source file in this table(" + targetTable.getName() + ").");
}

totalBytes = 0;
long totalBytes = 0;
for (List<TBrokerFileStatus> fileStatuses : fileStatusesList) {
Collections.sort(fileStatuses, T_BROKER_FILE_STATUS_COMPARATOR);
for (TBrokerFileStatus fileStatus : fileStatuses) {
Expand All @@ -438,7 +442,7 @@ private void getFileStatusAndCalcInstance() throws UserException {
numInstances = Math.min(numInstances, Config.max_broker_concurrency);
numInstances = Math.max(1, numInstances);

bytesPerInstance = totalBytes / numInstances + ((totalBytes % numInstances == 0) ? 0 : 1);
bytesPerInstance = (totalBytes + numInstances - 1) / numInstances;
}

private void assignBackends() throws UserException {
Expand Down Expand Up @@ -492,56 +496,47 @@ private void processFileGroup(
return;
}

TScanRangeLocations curLocations = newLocations(context.params, brokerDesc.getName());
long curInstanceBytes = 0;
// Create locations for file group
createScanRangeLocations(context, fileStatuses);

// Add files to locations with less allocated data
Pair<TScanRangeLocations, Long> smallestLocations = null;
long curFileOffset = 0;
for (int i = 0; i < fileStatuses.size(); ) {
TBrokerFileStatus fileStatus = fileStatuses.get(i);
long leftBytes = fileStatus.size - curFileOffset;
long tmpBytes = curInstanceBytes + leftBytes;
TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path);
List<String> columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path,
context.fileGroup.getColumnsFromPath());
int numberOfColumnsFromFile = context.slotDescByName.size() - columnsFromPath.size();
if (tmpBytes >= bytesPerInstance) {
// Now only support split plain text
if (formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) {
long rangeBytes = bytesPerInstance - curInstanceBytes;
TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType,
rangeBytes, columnsFromPath, numberOfColumnsFromFile);
brokerScanRange(curLocations).addToRanges(rangeDesc);
// This csv file reach end of file
if (tmpBytes == bytesPerInstance) {
curFileOffset = 0;
i++;
} else {
curFileOffset += rangeBytes;
}
} else {
TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType,
leftBytes, columnsFromPath, numberOfColumnsFromFile);
brokerScanRange(curLocations).addToRanges(rangeDesc);
curFileOffset = 0;
i++;
}

// New one scan
locationsList.add(curLocations);
curLocations = newLocations(context.params, brokerDesc.getName());
curInstanceBytes = 0;
smallestLocations = locationsHeap.poll();
long leftBytes = fileStatus.size - curFileOffset;
long rangeBytes = 0;
if (smallestLocations.second + leftBytes > bytesPerInstance &&
(formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable)) {
// Now only support split plain text
rangeBytes = bytesPerInstance - smallestLocations.second;
curFileOffset += rangeBytes;
} else {
TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType,
leftBytes, columnsFromPath, numberOfColumnsFromFile);
brokerScanRange(curLocations).addToRanges(rangeDesc);
rangeBytes = leftBytes;
curFileOffset = 0;
curInstanceBytes += leftBytes;
i++;
}

TBrokerRangeDesc rangeDesc =
createBrokerRangeDesc(curFileOffset, fileStatus, formatType, rangeBytes, columnsFromPath,
numberOfColumnsFromFile);
brokerScanRange(smallestLocations.first).addToRanges(rangeDesc);
smallestLocations.second += rangeBytes;
locationsHeap.add(smallestLocations);
}

// Put the last file
if (brokerScanRange(curLocations).isSetRanges()) {
locationsList.add(curLocations);
// Put locations with valid scan ranges to locationsList
while (!locationsHeap.isEmpty()) {
TScanRangeLocations locations = locationsHeap.poll().first;
if (brokerScanRange(locations).isSetRanges()) {
locationsList.add(locations);
}
}
}

Expand All @@ -561,9 +556,25 @@ private TBrokerRangeDesc createBrokerRangeDesc(long curFileOffset, TBrokerFileSt
return rangeDesc;
}

private void createScanRangeLocations(ParamCreateContext context, List<TBrokerFileStatus> fileStatuses)
throws UserException {
Preconditions.checkState(locationsHeap.isEmpty(), "Locations heap is not empty");

long totalBytes = 0;
for (TBrokerFileStatus fileStatus : fileStatuses) {
totalBytes += fileStatus.size;
}
long numInstances = (totalBytes + bytesPerInstance - 1) / bytesPerInstance;

for (int i = 0; i < numInstances; ++i) {
locationsHeap.add(Pair.create(newLocations(context.params, brokerDesc.getName()), 0L));
}
}

@Override
public void finalize(Analyzer analyzer) throws UserException {
locationsList = Lists.newArrayList();
locationsHeap = new PriorityQueue<>(SCAN_RANGE_LOCATIONS_COMPARATOR);

for (int i = 0; i < fileGroups.size(); ++i) {
List<TBrokerFileStatus> fileStatuses = fileStatusesList.get(i);
Expand All @@ -578,12 +589,15 @@ public void finalize(Analyzer analyzer) throws UserException {
}
processFileGroup(context, fileStatuses);
}

// update numInstances
numInstances = locationsList.size();

if (LOG.isDebugEnabled()) {
for (TScanRangeLocations locations : locationsList) {
LOG.debug("Scan range is {}", locations);
}
}

if (loadJobId != -1) {
LOG.info("broker load job {} with txn {} has {} scan range: {}",
loadJobId, txnId, locationsList.size(),
Expand Down
Loading

0 comments on commit 5b39560

Please sign in to comment.