Skip to content

Commit 1792700

Browse files
authored
fix: refactor taskmanager config and support deleting HDFS files when dropping tables (#3369)
1 parent 330d171 commit 1792700

File tree

18 files changed

+581
-275
lines changed

18 files changed

+581
-275
lines changed

java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/config/TaskManagerConfig.java

Lines changed: 403 additions & 148 deletions
Large diffs are not rendered by default.

java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/dao/JobIdGenerator.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,28 +28,29 @@ public class JobIdGenerator {
2828
static {
2929
try {
3030
zkClient = new ZKClient(ZKConfig.builder()
31-
.cluster(TaskManagerConfig.ZK_CLUSTER)
32-
.namespace(TaskManagerConfig.ZK_ROOT_PATH)
33-
.sessionTimeout(TaskManagerConfig.ZK_SESSION_TIMEOUT)
34-
.baseSleepTime(TaskManagerConfig.ZK_BASE_SLEEP_TIME)
35-
.connectionTimeout(TaskManagerConfig.ZK_CONNECTION_TIMEOUT)
36-
.maxConnectWaitTime(TaskManagerConfig.ZK_MAX_CONNECT_WAIT_TIME)
37-
.maxRetries(TaskManagerConfig.ZK_MAX_RETRIES)
31+
.cluster(TaskManagerConfig.getZkCluster())
32+
.namespace(TaskManagerConfig.getZkRootPath())
33+
.sessionTimeout(TaskManagerConfig.getZkSessionTimeout())
34+
.baseSleepTime(TaskManagerConfig.getZkBaseSleepTime())
35+
.connectionTimeout(TaskManagerConfig.getZkConnectionTimeout())
36+
.maxConnectWaitTime(TaskManagerConfig.getZkMaxConnectWaitTime())
37+
.maxRetries(TaskManagerConfig.getZkMaxRetries())
3838
.build());
3939
zkClient.connect();
40+
4041
// Initialize zk nodes
41-
zkClient.createNode(TaskManagerConfig.ZK_ROOT_PATH, "".getBytes());
42-
zkClient.createNode(TaskManagerConfig.ZK_TASKMANAGER_PATH, "".getBytes());
42+
zkClient.createNode(TaskManagerConfig.getZkRootPath(), "".getBytes());
43+
zkClient.createNode(TaskManagerConfig.getZkTaskmanagerPath(), "".getBytes());
4344

4445
int lastMaxJobId = 0;
45-
if (zkClient.checkExists(TaskManagerConfig.ZK_MAX_JOB_ID_PATH)) {
46+
if (zkClient.checkExists(TaskManagerConfig.getZkMaxJobIdPath())) {
4647
// Get last max job id from zk
47-
lastMaxJobId = Integer.parseInt(zkClient.getNodeValue(TaskManagerConfig.ZK_MAX_JOB_ID_PATH));
48+
lastMaxJobId = Integer.parseInt(zkClient.getNodeValue(TaskManagerConfig.getZkMaxJobIdPath()));
4849
}
4950
currentJobId = lastMaxJobId;
50-
maxJobId = lastMaxJobId + TaskManagerConfig.PREFETCH_JOBID_NUM;
51+
maxJobId = lastMaxJobId + TaskManagerConfig.getPrefetchJobidNum();
5152
// set max job id in zk
52-
zkClient.setNodeValue(TaskManagerConfig.ZK_MAX_JOB_ID_PATH, String.valueOf(maxJobId).getBytes());
53+
zkClient.setNodeValue(TaskManagerConfig.getZkMaxJobIdPath(), String.valueOf(maxJobId).getBytes());
5354

5455
} catch (Exception e) {
5556
zkClient = null;
@@ -67,8 +68,8 @@ public static int getUniqueId() throws Exception {
6768
currentJobId += 1;
6869
if (currentJobId > maxJobId) {
6970
// Update zk before returning job id
70-
maxJobId += TaskManagerConfig.PREFETCH_JOBID_NUM;
71-
zkClient.setNodeValue(TaskManagerConfig.ZK_MAX_JOB_ID_PATH, String.valueOf(maxJobId).getBytes());
71+
maxJobId += TaskManagerConfig.getPrefetchJobidNum();
72+
zkClient.setNodeValue(TaskManagerConfig.getZkMaxJobIdPath(), String.valueOf(maxJobId).getBytes());
7273
}
7374
return currentJobId;
7475
}

java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/JobResultSaver.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public boolean saveFile(int resultId, String jsonData) {
105105
return true;
106106
}
107107
// save to <log path>/tmp_result/<result_id>/<unique file name>
108-
String savePath = String.format("%s/tmp_result/%d", TaskManagerConfig.JOB_LOG_PATH, resultId);
108+
String savePath = String.format("%s/tmp_result/%d", TaskManagerConfig.getJobLogPath(), resultId);
109109
synchronized (this) {
110110
File saveP = new File(savePath);
111111
if (!saveP.exists()) {
@@ -151,7 +151,7 @@ public String readResult(int resultId, long timeoutMs) throws InterruptedExcepti
151151
}
152152
String output = "";
153153
// all finished, read csv from savePath
154-
String savePath = String.format("%s/tmp_result/%d", TaskManagerConfig.JOB_LOG_PATH, resultId);
154+
String savePath = String.format("%s/tmp_result/%d", TaskManagerConfig.getJobLogPath(), resultId);
155155
File saveP = new File(savePath);
156156
// If saveP not exists, means no real result saved. But it may use a uncleaned
157157
// path, whether read result succeed or not, we should delete it.
@@ -225,7 +225,7 @@ public void reset() throws IOException {
225225
synchronized (idStatus) {
226226
Collections.fill(idStatus, 0);
227227
}
228-
String tmpResultDir = String.format("%s/tmp_result", TaskManagerConfig.JOB_LOG_PATH);
228+
String tmpResultDir = String.format("%s/tmp_result", TaskManagerConfig.getJobLogPath());
229229
// delete anyway
230230
FileUtils.forceDelete(new File(tmpResultDir));
231231
}

java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/TaskManagerServer.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import com._4paradigm.openmldb.taskmanager.config.ConfigException;
2020
import com._4paradigm.openmldb.taskmanager.tracker.JobTrackerService;
21-
import com._4paradigm.openmldb.taskmanager.util.VersionUtil;
2221
import com._4paradigm.openmldb.taskmanager.zk.FailoverWatcher;
2322
import lombok.extern.slf4j.Slf4j;
2423
import com._4paradigm.openmldb.taskmanager.config.TaskManagerConfig;
@@ -45,7 +44,7 @@ public class TaskManagerServer {
4544
* @throws ConfigException if config file does not exist or some configs are incorrect.
4645
*/
4746
public TaskManagerServer() throws ConfigException {
48-
TaskManagerConfig.parse();
47+
TaskManagerConfig.print();
4948
}
5049

5150
/**
@@ -69,7 +68,7 @@ public void start(Boolean blocking) throws ConfigException, IOException, Interru
6968
logger.info("The server runs and prepares for leader election");
7069
if (failoverWatcher.blockUntilActive()) {
7170
logger.info("The server becomes active master and prepare to do business logic");
72-
if (TaskManagerConfig.TRACK_UNFINISHED_JOBS) {
71+
if (TaskManagerConfig.getTrackUnfinishedJobs()) {
7372
// Start threads to track unfinished jobs
7473
JobTrackerService.startTrackerThreads();
7574
}
@@ -97,14 +96,14 @@ public void startRpcServer(Boolean blocking) throws ConfigException, Interrupted
9796
RpcServerOptions options = new RpcServerOptions();
9897
options.setReceiveBufferSize(64 * 1024 * 1024);
9998
options.setSendBufferSize(64 * 1024 * 1024);
100-
options.setIoThreadNum(TaskManagerConfig.IO_THREAD);
101-
options.setWorkThreadNum(TaskManagerConfig.WORKER_THREAD);
102-
options.setKeepAliveTime(TaskManagerConfig.CHANNEL_KEEP_ALIVE_TIME);
103-
rpcServer = new RpcServer(TaskManagerConfig.PORT, options);
99+
options.setIoThreadNum(TaskManagerConfig.getServerIoThreads());
100+
options.setWorkThreadNum(TaskManagerConfig.getServerWorkerThreads());
101+
options.setKeepAliveTime(TaskManagerConfig.getChannelKeepAliveTime());
102+
rpcServer = new RpcServer(TaskManagerConfig.getServerPort(), options);
104103
rpcServer.registerService(new TaskManagerImpl());
105104
rpcServer.start();
106-
log.info("Start TaskManager on {} with worker thread number {}", TaskManagerConfig.PORT,
107-
TaskManagerConfig.WORKER_THREAD);
105+
log.info("Start TaskManager on {} with worker thread number {}", TaskManagerConfig.getServerPort(),
106+
TaskManagerConfig.getServerWorkerThreads());
108107

109108
if (blocking) {
110109
// make server keep running

java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/impl/TaskManagerImpl.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,17 +73,17 @@ public TaskManagerImpl() throws InterruptedException, ConfigException {
7373
*/
7474
private void initExternalFunction() throws InterruptedException {
7575
ZKClient zkClient = new ZKClient(ZKConfig.builder()
76-
.cluster(TaskManagerConfig.ZK_CLUSTER)
77-
.namespace(TaskManagerConfig.ZK_ROOT_PATH)
78-
.sessionTimeout(TaskManagerConfig.ZK_SESSION_TIMEOUT)
79-
.baseSleepTime(TaskManagerConfig.ZK_BASE_SLEEP_TIME)
80-
.connectionTimeout(TaskManagerConfig.ZK_CONNECTION_TIMEOUT)
81-
.maxConnectWaitTime(TaskManagerConfig.ZK_MAX_CONNECT_WAIT_TIME)
82-
.maxRetries(TaskManagerConfig.ZK_MAX_RETRIES)
76+
.cluster(TaskManagerConfig.getZkCluster())
77+
.namespace(TaskManagerConfig.getZkRootPath())
78+
.sessionTimeout(TaskManagerConfig.getZkSessionTimeout())
79+
.baseSleepTime(TaskManagerConfig.getZkBaseSleepTime())
80+
.connectionTimeout(TaskManagerConfig.getZkConnectionTimeout())
81+
.maxConnectWaitTime(TaskManagerConfig.getZkMaxConnectWaitTime())
82+
.maxRetries(TaskManagerConfig.getZkMaxRetries())
8383
.build());
8484
zkClient.connect();
8585

86-
String funPath = TaskManagerConfig.ZK_ROOT_PATH + "/data/function";
86+
String funPath = TaskManagerConfig.getZkRootPath() + "/data/function";
8787
try {
8888
List<String> funNames = zkClient.getChildren(funPath);
8989
for (String name : funNames) {
@@ -220,7 +220,7 @@ public TaskManager.RunBatchSqlResponse RunBatchSql(TaskManager.RunBatchSqlReques
220220
// HOST can't be 0.0.0.0 if distributed or spark is not local
221221
confMap.put("spark.openmldb.savejobresult.http",
222222
String.format("http://%s:%d/openmldb.taskmanager.TaskManagerServer/SaveJobResult",
223-
TaskManagerConfig.HOST, TaskManagerConfig.PORT));
223+
TaskManagerConfig.getServerHost(), TaskManagerConfig.getServerPort()));
224224
// we can't get spark job id here, so we use JobResultSaver id, != spark job id
225225
// if too much running jobs to save result, throw exception
226226
int resultId = jobResultSaver.genResultId();
@@ -234,7 +234,7 @@ public TaskManager.RunBatchSqlResponse RunBatchSql(TaskManager.RunBatchSqlReques
234234
if (finalJobInfo.isSuccess()) {
235235
// wait for all files of result saved and read them, large timeout
236236
// TODO: Test for K8S backend
237-
String output = jobResultSaver.readResult(resultId, TaskManagerConfig.BATCH_JOB_RESULT_MAX_WAIT_TIME);
237+
String output = jobResultSaver.readResult(resultId, TaskManagerConfig.getBatchJobResultMaxWaitTime());
238238
return TaskManager.RunBatchSqlResponse.newBuilder().setCode(StatusCode.SUCCESS).setOutput(output)
239239
.build();
240240
} else {
@@ -253,7 +253,7 @@ public TaskManager.RunBatchSqlResponse RunBatchSql(TaskManager.RunBatchSqlReques
253253
// rpc max time is CHANNEL_KEEP_ALIVE_TIME, so we don't need to wait too long
254254
private JobInfo busyWaitJobInfo(int jobId, int waitSeconds) throws InterruptedException {
255255
long maxWaitEnd = System.currentTimeMillis()
256-
+ (waitSeconds == 0 ? TaskManagerConfig.CHANNEL_KEEP_ALIVE_TIME : waitSeconds) * 1000;
256+
+ (waitSeconds == 0 ? TaskManagerConfig.getChannelKeepAliveTime() : waitSeconds) * 1000;
257257
while (System.currentTimeMillis() < maxWaitEnd) {
258258
Option<JobInfo> info = JobInfoManager.getJob(jobId);
259259
if (info.isEmpty()) {

java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/udf/ExternalFunctionManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class ExternalFunctionManager {
3232
static private Map<String, String> nameFileMap = new ConcurrentHashMap<>();
3333

3434
static public String getLibraryFilePath(String libraryFileName) {
35-
return Paths.get(TaskManagerConfig.EXTERNAL_FUNCTION_DIR, libraryFileName).toString();
35+
return Paths.get(TaskManagerConfig.getExternalFunctionDir(), libraryFileName).toString();
3636
}
3737

3838
static public void addFunction(String fnName, String libraryFileName) throws Exception {

java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/zk/FailoverWatcher.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,13 @@ public class FailoverWatcher implements Watcher {
5151
*/
5252
public FailoverWatcher() throws IOException {
5353

54-
baseZnode = TaskManagerConfig.ZK_ROOT_PATH + "/taskmanager";
54+
baseZnode = TaskManagerConfig.getZkRootPath() + "/taskmanager";
5555
masterZnode = baseZnode + "/leader";
56-
zkQuorum = TaskManagerConfig.ZK_CLUSTER;
57-
sessionTimeout = TaskManagerConfig.ZK_SESSION_TIMEOUT;
56+
zkQuorum = TaskManagerConfig.getZkCluster();
57+
sessionTimeout = TaskManagerConfig.getZkSessionTimeout();
5858
connectRetryTimes = 3;
59-
String serverHost = TaskManagerConfig.HOST;
60-
int serverPort = TaskManagerConfig.PORT;
59+
String serverHost = TaskManagerConfig.getServerHost();
60+
int serverPort = TaskManagerConfig.getServerPort();
6161
hostPort = new HostPort(serverHost, serverPort);
6262

6363
connectZooKeeper();
@@ -91,7 +91,7 @@ protected void connectZooKeeper() throws IOException {
9191
*/
9292
protected void initZnode() {
9393
try {
94-
ZooKeeperUtil.createAndFailSilent(this, TaskManagerConfig.ZK_ROOT_PATH);
94+
ZooKeeperUtil.createAndFailSilent(this, TaskManagerConfig.getZkRootPath());
9595
ZooKeeperUtil.createAndFailSilent(this, baseZnode);
9696
} catch (Exception e) {
9797
LOG.fatal("Error to create znode " + baseZnode

java/openmldb-taskmanager/src/main/resources/taskmanager.properties

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,5 @@ spark.default.conf=
2727
spark.eventLog.dir=
2828
spark.yarn.maxAppAttempts=1
2929
batchjob.jar.path=
30-
namenode.uri=
3130
offline.data.prefix=file:///tmp/openmldb_offline_storage/
3231
hadoop.conf.dir=

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/JobInfoManager.scala

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import com._4paradigm.openmldb.sdk.SdkOption
2020
import com._4paradigm.openmldb.sdk.impl.SqlClusterExecutor
2121
import com._4paradigm.openmldb.taskmanager.config.TaskManagerConfig
2222
import com._4paradigm.openmldb.taskmanager.dao.{JobIdGenerator, JobInfo}
23+
import com._4paradigm.openmldb.taskmanager.util.HdfsUtil
2324
import com._4paradigm.openmldb.taskmanager.yarn.YarnClientUtil
2425
import org.slf4j.LoggerFactory
2526
import org.apache.hadoop.fs.{FileSystem, LocalFileSystem, Path}
@@ -42,8 +43,8 @@ object JobInfoManager {
4243
private val JOB_INFO_TABLE_NAME = "JOB_INFO"
4344

4445
private val option = new SdkOption
45-
option.setZkCluster(TaskManagerConfig.ZK_CLUSTER)
46-
option.setZkPath(TaskManagerConfig.ZK_ROOT_PATH)
46+
option.setZkCluster(TaskManagerConfig.getZkCluster)
47+
option.setZkPath(TaskManagerConfig.getZkRootPath)
4748
val sqlExecutor = new SqlClusterExecutor(option)
4849
sqlExecutor.executeSQL("", "set @@execute_mode='online';")
4950

@@ -52,7 +53,7 @@ object JobInfoManager {
5253
val startTime = new java.sql.Timestamp(Calendar.getInstance.getTime().getTime())
5354
val initialState = "Submitted"
5455
val parameter = if (args != null && args.length>0) args.mkString(",") else ""
55-
val cluster = sparkConf.getOrElse("spark.master", TaskManagerConfig.SPARK_MASTER)
56+
val cluster = sparkConf.getOrElse("spark.master", TaskManagerConfig.getSparkMaster)
5657

5758
// TODO: Parse if run in yarn or local
5859
val jobInfo = new JobInfo(jobId, jobType, initialState, startTime, null, parameter, cluster, "", "")
@@ -210,12 +211,8 @@ object JobInfoManager {
210211
FileUtils.deleteDirectory(dir)
211212

212213
} else if (filePath.startsWith("hdfs://")) {
213-
val conf = new Configuration();
214-
// TODO: Get namenode uri from config file
215-
val namenodeUri = TaskManagerConfig.NAMENODE_URI
216-
val hdfs = FileSystem.get(URI.create(s"hdfs://$namenodeUri"), conf)
217-
hdfs.delete(new Path(filePath), true)
218-
214+
logger.info(s"Try to delete the HDFS path ${filePath}")
215+
HdfsUtil.deleteHdfsDir(filePath)
219216
} else {
220217
throw new Exception(s"Get unsupported file path: $filePath")
221218
}

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/LogManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ object LogManager {
2626
private val logger = LoggerFactory.getLogger(this.getClass)
2727

2828
def getJobLogFile(id: Int): File = {
29-
Paths.get(TaskManagerConfig.JOB_LOG_PATH, s"job_${id}.log").toFile
29+
Paths.get(TaskManagerConfig.getJobLogPath, s"job_${id}.log").toFile
3030
}
3131

3232
def getJobErrorLogFile(id: Int): File = {
33-
Paths.get(TaskManagerConfig.JOB_LOG_PATH, s"job_${id}_error.log").toFile
33+
Paths.get(TaskManagerConfig.getJobLogPath, s"job_${id}_error.log").toFile
3434
}
3535

3636
def getFileContent(inputFile: File): String = {

0 commit comments

Comments
 (0)