Skip to content

Commit

Permalink
[feature](Load)(step2)support nereids load job schedule (apache#26356)
Browse files Browse the repository at this point in the history
We will  Integrate new load job manager into  new job scheduling framework so that the insert into task can be scheduled after the broker load sql  is converted to insert into TVF(table value function) sql.

issue: apache#24221

Now support:
1. load data by tvf insert into sql, but just for simple load(columns need to be defined in the table)
2. show load stmt
- job id, label name, job state, time info
- simple progress
3. cancel load from db
4. support that enable new load through Config.enable_nereids_load
5. can replay job after restarting doris

TODO:
- support partition insert job
- support show statistics from BE
- support multiple task and collect task statistic
- support transactional task
- need add ut case
  • Loading branch information
wsjz authored and stephen committed Dec 28, 2023
1 parent 9dce5e2 commit d87de60
Show file tree
Hide file tree
Showing 33 changed files with 1,324 additions and 374 deletions.
12 changes: 12 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1931,6 +1931,18 @@ public class Config extends ConfigBase {
@ConfField(masterOnly = true)
public static boolean enable_hms_events_incremental_sync = false;

/**
* If set to true, doris will try to parse the ddl of a hive view and try to execute the query
* otherwise it will throw an AnalysisException.
*/
@ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL, description = {
"当前默认设置为 false,开启后支持使用新优化器的load语句导入数据,失败后会降级旧的load语句。",
"Now default set to true, After this function is enabled, the load statement of "
+ "the new optimizer can be used to import data. If this function fails, "
+ "the old load statement will be degraded."})
public static boolean enable_nereids_load = false;


/**
* Maximum number of events to poll in each RPC.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ statement
(withRemoteStorageSystem)?
(PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
(commentSpec)? #load
| LOAD LABEL lableName=identifier
LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN
resourceDesc
(PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
(commentSpec)? #resourceLoad
| LOAD mysqlDataDesc
(PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
(commentSpec)? #mysqlLoad
Expand Down Expand Up @@ -131,7 +126,7 @@ dataDesc
(PARTITION partition=identifierList)?
(COLUMNS TERMINATED BY comma=STRING_LITERAL)?
(LINES TERMINATED BY separator=STRING_LITERAL)?
(FORMAT AS format=identifier)?
(FORMAT AS format=identifierOrStringLiteral)?
(columns=identifierList)?
(columnsFromPath=colFromPath)?
(columnMapping=colMappingList)?
Expand Down Expand Up @@ -167,6 +162,11 @@ refreshMethod
: COMPLETE | AUTO
;

identifierOrStringLiteral
: identifier
| STRING_LITERAL
;

identifierOrText
: errorCapturingIdentifier
| STRING_LITERAL
Expand Down Expand Up @@ -224,7 +224,8 @@ mappingExpr
;

withRemoteStorageSystem
: WITH S3 LEFT_PAREN
: resourceDesc
| WITH S3 LEFT_PAREN
brokerProperties=propertyItemList
RIGHT_PAREN
| WITH HDFS LEFT_PAREN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,8 @@ public void analyze(Analyzer analyzer) throws UserException {
analyzerSqlStmt();
// check its insert stmt,currently only support insert stmt
//todo when support other stmt,need to check stmt type and generate jobInstance
InsertJob job = new InsertJob();
JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
jobExecutionConfiguration.setExecuteType(executeType);
job.setCreateTimeMs(System.currentTimeMillis());
TimerDefinition timerDefinition = new TimerDefinition();

if (null != onceJobStartTimestamp) {
Expand Down Expand Up @@ -148,17 +146,19 @@ public void analyze(Analyzer analyzer) throws UserException {
}
checkJobName(labelName.getLabelName());
jobExecutionConfiguration.setTimerDefinition(timerDefinition);
job.setJobConfig(jobExecutionConfiguration);

job.setComment(comment);
job.setCurrentDbName(labelName.getDbName());
job.setJobName(labelName.getLabelName());
job.setCreateUser(ConnectContext.get().getCurrentUserIdentity());
job.setJobStatus(JobStatus.RUNNING);
job.setJobId(Env.getCurrentEnv().getNextId());
String originStmt = getOrigStmt().originStmt;
String executeSql = parseExecuteSql(originStmt);
job.setExecuteSql(executeSql);
// create job use label name as its job name
String jobName = labelName.getLabelName();
InsertJob job = new InsertJob(jobName,
JobStatus.RUNNING,
labelName.getDbName(),
comment,
ConnectContext.get().getCurrentUserIdentity(),
jobExecutionConfiguration,
System.currentTimeMillis(),
executeSql);
//job.checkJobParams();
jobInstance = job;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ public Set<JobState> getStates() {
return states;
}

public org.apache.doris.load.loadv2.JobState getStateV2() {
if (Strings.isNullOrEmpty(stateValue)) {
return null;
}
return org.apache.doris.load.loadv2.JobState.valueOf(stateValue);
}

public boolean isAccurateMatch() {
return isAccurateMatch;
}
Expand Down
9 changes: 9 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.jobs.load.LabelProcessor;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVPropertyInfo;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVRefreshInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
Expand Down Expand Up @@ -362,6 +363,7 @@ public class Env {

private ExportTaskRegister exportTaskRegister;
private JobManager<? extends AbstractJob<?, ?>, ?> jobManager;
private LabelProcessor labelProcessor;
private TransientTaskManager transientTaskManager;

private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos
Expand Down Expand Up @@ -641,8 +643,11 @@ private Env(boolean isCheckpointCatalog) {
}
this.metastoreEventsProcessor = new MetastoreEventsProcessor();
this.jobManager = new JobManager<>();
this.labelProcessor = new LabelProcessor();
this.transientTaskManager = new TransientTaskManager();
this.exportTaskRegister = new ExportTaskRegister(transientTaskManager);
this.transientTaskManager = new TransientTaskManager();

this.replayedJournalId = new AtomicLong(0L);
this.stmtIdCounter = new AtomicLong(0L);
this.isElectable = false;
Expand Down Expand Up @@ -3907,6 +3912,10 @@ public JobManager getJobManager() {
return jobManager;
}

public LabelProcessor getLabelProcessor() {
return labelProcessor;
}

public TransientTaskManager getTransientTaskManager() {
return transientTaskManager;
}
Expand Down
98 changes: 85 additions & 13 deletions fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.TaskStatus;
Expand Down Expand Up @@ -76,14 +78,55 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
private JobExecutionConfiguration jobConfig;

@SerializedName(value = "ctms")
private Long createTimeMs;
private long createTimeMs;

@SerializedName(value = "sql")
String executeSql;
@SerializedName(value = "stm")
private long startTimeMs = -1L;

@SerializedName(value = "ftm")
private long finishTimeMs;

@SerializedName(value = "sql")
String executeSql;

public AbstractJob() {}

public AbstractJob(Long id) {
jobId = id;
}

/**
* executeSql and runningTasks is not required for load.
*/
public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
String currentDbName,
String comment,
UserIdentity createUser,
JobExecutionConfiguration jobConfig) {
this(jobId, jobName, jobStatus, currentDbName, comment,
createUser, jobConfig, System.currentTimeMillis(), null, null);
}

public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
String currentDbName,
String comment,
UserIdentity createUser,
JobExecutionConfiguration jobConfig,
Long createTimeMs,
String executeSql,
List<T> runningTasks) {
this.jobId = jobId;
this.jobName = jobName;
this.jobStatus = jobStatus;
this.currentDbName = currentDbName;
this.comment = comment;
this.createUser = createUser;
this.jobConfig = jobConfig;
this.createTimeMs = createTimeMs;
this.executeSql = executeSql;
this.runningTasks = runningTasks;
}

private List<T> runningTasks = new ArrayList<>();

@Override
Expand All @@ -109,6 +152,10 @@ public void cancelAllTasks() throws JobException {
.add("Comment")
.build();

protected static long getNextJobId() {
return System.nanoTime() + RandomUtils.nextInt();
}

@Override
public void cancelTaskById(long taskId) throws JobException {
if (CollectionUtils.isEmpty(runningTasks)) {
Expand Down Expand Up @@ -154,17 +201,18 @@ public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
return createTasks(taskType, taskContext);
}

public void initTasks(List<? extends AbstractTask> tasks) {
public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
if (CollectionUtils.isEmpty(getRunningTasks())) {
runningTasks = new ArrayList<>();
}
tasks.forEach(task -> {
task.setJobId(jobId);
task.setTaskId(getNextId());
task.setTaskType(taskType);
task.setJobId(getJobId());
task.setCreateTimeMs(System.currentTimeMillis());
task.setStatus(TaskStatus.PENDING);
});
if (CollectionUtils.isEmpty(getRunningTasks())) {
setRunningTasks(new ArrayList<>());
}
getRunningTasks().addAll((Collection<? extends T>) tasks);
getRunningTasks().addAll(tasks);
this.startTimeMs = System.currentTimeMillis();
}

public void checkJobParams() {
Expand Down Expand Up @@ -208,10 +256,22 @@ public void updateJobStatus(JobStatus newJobStatus) throws JobException {
public static AbstractJob readFields(DataInput in) throws IOException {
String jsonJob = Text.readString(in);
AbstractJob job = GsonUtils.GSON.fromJson(jsonJob, AbstractJob.class);
job.setRunningTasks(new ArrayList<>());
job.runningTasks = new ArrayList<>();
return job;
}

public void logCreateOperation() {
Env.getCurrentEnv().getEditLog().logCreateJob(this);
}

public void logFinalOperation() {
Env.getCurrentEnv().getEditLog().logEndJob(this);
}

public void logUpdateOperation() {
Env.getCurrentEnv().getEditLog().logUpdateJob(this);
}

@Override
public void onTaskFail(T task) throws JobException {
updateJobStatusIfEnd();
Expand Down Expand Up @@ -303,7 +363,19 @@ public ShowResultSetMetaData getJobMetaData() {
return builder.build();
}

private static long getNextId() {
return System.nanoTime() + RandomUtils.nextInt();
@Override
public void onRegister() throws JobException {}

@Override
public void onUnRegister() throws JobException {}

@Override
public void onReplayCreate() throws JobException {
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg", "replay create scheduler job").build());
}

@Override
public void onReplayEnd(AbstractJob<?, C> replayJob) throws JobException {
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg", "replay delete scheduler job").build());
}
}
25 changes: 24 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,34 @@ public interface Job<T extends AbstractTask, C> {

/**
* Cancels all running tasks of this job.
*
* @throws JobException If cancelling a running task fails.
*/
void cancelAllTasks() throws JobException;

/**
* register job
* @throws JobException If register job failed.
*/
void onRegister() throws JobException;

/**
* register job failed
* @throws JobException If failed.
*/
void onUnRegister() throws JobException;

/**
* replay create job
* @throws JobException If replay create failed.
*/
void onReplayCreate() throws JobException;

/**
* replay finished or cancelled job
* @throws JobException If replay end failed.
*/
void onReplayEnd(AbstractJob<?, C> replayJob) throws JobException;

/**
* Notifies the job when a task execution fails.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public enum JobExecuteType {
*/
MANUAL,
/**
* The job will be executed immediately.
* The job will be executed only once and immediately.
*/
INSTANT,
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public enum JobStatus {
* The stop state cannot be resumed
*/
STOPPED,

/**
* When the task is finished, the finished state will be triggered.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@

public enum JobType {
INSERT,
MV
MV,
}
Loading

0 comments on commit d87de60

Please sign in to comment.