Skip to content

Commit

Permalink
Fix bug that load statistic in show load result is incorrect (apache#…
Browse files Browse the repository at this point in the history
…1871)

Each load job has several load tasks, and each task is a query plan
with serveral plan fragments. Each plan fragment report query profile
independently.
So we need to collect each plan fragment's report, separately.
  • Loading branch information
morningman authored Sep 25, 2019
1 parent ce6fb1c commit f3bbdfe
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

/**
* There are 3 steps in BrokerLoadJob: BrokerPendingTask, LoadLoadingTask, CommitAndPublishTxn.
Expand Down Expand Up @@ -239,13 +238,11 @@ public void onTaskFailed(long taskId, FailMsg failMsg) {
// retry task
idToTasks.remove(loadTask.getSignature());
if (loadTask instanceof LoadLoadingTask) {
loadStatistic.numScannedRowsMap.remove(((LoadLoadingTask) loadTask).getLoadId());
loadStatistic.removeLoad(((LoadLoadingTask) loadTask).getLoadId());
}
loadTask.updateRetryInfo();
idToTasks.put(loadTask.getSignature(), loadTask);
if (loadTask instanceof LoadLoadingTask) {
loadStatistic.numScannedRowsMap.put(((LoadLoadingTask) loadTask).getLoadId(), new AtomicLong(0));
}
// load id will be added to loadStatistic when executing this task
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask);
return;
}
Expand Down Expand Up @@ -365,7 +362,7 @@ private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachme
// idToTasks contains previous LoadPendingTasks, so idToTasks is just used to save all tasks.
// use newLoadingTasks to save new created loading tasks and submit them later.
newLoadingTasks.add(task);
loadStatistic.numScannedRowsMap.put(loadId, new AtomicLong(0));
// load id will be added to loadStatistic when executing this task

// save all related tables and rollups in transaction state
TransactionState txnState = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(transactionId);
Expand Down
55 changes: 41 additions & 14 deletions fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@
import org.apache.doris.transaction.TransactionState;

import com.google.common.base.Joiner;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import com.google.gson.Gson;

import org.apache.logging.log4j.LogManager;
Expand All @@ -67,7 +69,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public abstract class LoadJob extends AbstractTxnStateChangeCallback implements LoadTaskCallback, Writable {
Expand Down Expand Up @@ -129,23 +130,48 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements

public static class LoadStatistic {
// number of rows processed on BE, this number will be updated periodically by query report.
// A load job may has several load tasks, so the map key is load task's plan load id.
public Map<TUniqueId, AtomicLong> numScannedRowsMap = Maps.newConcurrentMap();
// A load job may has several load tasks(queries), and each task has several fragments.
// each fragment will report independently.
// load task id -> fragment id -> rows count
private Table<TUniqueId, TUniqueId, Long> counterTbl = HashBasedTable.create();

// number of file to be loaded
public int fileNum = 0;
public long totalFileSizeB = 0;

public String toJson() {

// init the statistic of specified load task
public synchronized void initLoad(TUniqueId loadId, Set<TUniqueId> fragmentIds) {
counterTbl.rowMap().remove(loadId);
for (TUniqueId fragId : fragmentIds) {
counterTbl.put(loadId, fragId, 0L);
}
}

public synchronized void removeLoad(TUniqueId loadId) {
counterTbl.rowMap().remove(loadId);
}

public synchronized void updateLoad(TUniqueId loadId, TUniqueId fragmentId, long rows) {
if (counterTbl.contains(loadId, fragmentId)) {
counterTbl.put(loadId, fragmentId, rows);
}
}

public synchronized void clearAllLoads() {
counterTbl.clear();
}

public synchronized String toJson() {
long total = 0;
for (AtomicLong atomicLong : numScannedRowsMap.values()) {
total += atomicLong.get();
for (long rows : counterTbl.values()) {
total += rows;
}

Map<String, Object> details = Maps.newHashMap();
details.put("ScannedRows", total);
details.put("FileNumber", fileNum);
details.put("FileSize", totalFileSizeB);
details.put("TaskNumber", numScannedRowsMap.size());
details.put("TaskNumber", counterTbl.rowMap().size());
Gson gson = new Gson();
return gson.toJson(details);
}
Expand Down Expand Up @@ -222,11 +248,12 @@ public long getTransactionId() {
return transactionId;
}

public void updateScannedRows(TUniqueId loadId, long scannedRows) {
AtomicLong atomicLong = loadStatistic.numScannedRowsMap.get(loadId);
if (atomicLong != null) {
atomicLong.set(scannedRows);
}
public void initScannedRows(TUniqueId loadId, Set<TUniqueId> fragmentIds) {
loadStatistic.initLoad(loadId, fragmentIds);
}

public void updateScannedRows(TUniqueId loadId, TUniqueId fragmentId, long scannedRows) {
loadStatistic.updateLoad(loadId, fragmentId, scannedRows);
}

public void setLoadFileInfo(int fileNum, long fileSize) {
Expand Down Expand Up @@ -510,7 +537,7 @@ protected void unprotectedExecuteCancel(FailMsg failMsg, boolean abortTxn) {
}
}
idToTasks.clear();
loadStatistic.numScannedRowsMap.clear();
loadStatistic.clearAllLoads();

// set failMsg and state
this.failMsg = failMsg;
Expand Down
11 changes: 9 additions & 2 deletions fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,17 @@ private boolean needSave(LoadJob loadJob) {
return false;
}

public void updateJobScannedRows(Long jobId, TUniqueId loadId, long scannedRows) {
public void initJobScannedRows(Long jobId, TUniqueId loadId, Set<TUniqueId> fragmentIds) {
LoadJob job = idToLoadJob.get(jobId);
if (job != null) {
job.updateScannedRows(loadId, scannedRows);
job.initScannedRows(loadId, fragmentIds);
}
}

public void updateJobScannedRows(Long jobId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows) {
LoadJob job = idToLoadJob.get(jobId);
if (job != null) {
job.updateScannedRows(loadId, fragmentId, scannedRows);
}
}

Expand Down
6 changes: 4 additions & 2 deletions fe/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -400,10 +400,11 @@ public void exec() throws Exception {
toBrpcHost(topParams.instanceExecParams.get(0).host),
queryOptions.query_timeout * 1000);
} else {
// This is a insert statement.
// This is a load process.
this.queryOptions.setIs_report_success(true);
deltaUrls = Lists.newArrayList();
loadCounters = Maps.newHashMap();
Catalog.getCurrentCatalog().getLoadManager().initJobScannedRows(jobId, queryId, instanceIds);
}

// to keep things simple, make async Cancel() calls wait until plan fragment
Expand Down Expand Up @@ -1191,7 +1192,8 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) {
}

if (params.isSetLoaded_rows()) {
Catalog.getCurrentCatalog().getLoadManager().updateJobScannedRows(jobId, params.query_id, params.loaded_rows);
Catalog.getCurrentCatalog().getLoadManager().updateJobScannedRows(
jobId, params.query_id, params.fragment_instance_id, params.loaded_rows);
}

return;
Expand Down

0 comments on commit f3bbdfe

Please sign in to comment.