Skip to content

Commit efbaaff

Browse files
committed
[KYUUBI #4390] Allow user to provide batch id on submitting batch job
### _Why are the changes needed?_ This PR proposes to allow the user to provide a batch id on submitting a batch job. If the batch id already existed in metastore, Kyuubi ignores this submission and just returns the existing one, w/ a marker in response, this could avoid duplicated batch job submission. Talking about the implementation, the key things are How does the user set the custom batch id? - User can optionally set the `kyuubi.batch.id` in `conf: Map[String, String]`, and the value must be a UUID, for Java users, it can be generated by `UUID.randomUUID().toString()` How does the Kyuubi Server detect the duplication? - It's simple in single Kyuubi Server instance case, Kyuubi just needs to look up the metastore before creating a batch job - In HA mode, suppose the user requests to create the batch jobs w/ the same batch id concurrently, multiple Kyuubi Servers may process the request and try to insert to metastore DB at the same time, but only the first insertion success, others will fail w/ "duplicated key", Kyuubi Server needs to catch this error and return the existing batch job information instead of creating a new one. How does the user know if the returned batch job is new created or duplicated? - a new field `batchInfo: Map[String, String]` is added to the response, and for duplicated batch job, `"kyuubi.batch.duplicated": "true"` will be contained. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4390 from pan3793/batch-id. Closes #4390 b6917ba [Cheng Pan] move constant to rest client 79ef1b5 [Cheng Pan] flaky test f822285 [Cheng Pan] it 88bdfa5 [Cheng Pan] ut fd8bc22 [Cheng Pan] ut c820f5e [Cheng Pan] Support user provided batch id on batch job submission Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 885d0c7 commit efbaaff

File tree

13 files changed

+200
-89
lines changed

13 files changed

+200
-89
lines changed

integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717

1818
package org.apache.kyuubi.kubernetes.test.spark
1919

20+
import java.util.UUID
21+
2022
import scala.collection.JavaConverters._
2123
import scala.concurrent.duration._
2224

2325
import org.apache.hadoop.conf.Configuration
2426
import org.apache.hadoop.net.NetUtils
2527

26-
import org.apache.kyuubi.{BatchTestHelper, KyuubiException, Logging, Utils, WithKyuubiServer, WithSimpleDFSService}
28+
import org.apache.kyuubi._
29+
import org.apache.kyuubi.client.util.BatchUtils._
2730
import org.apache.kyuubi.config.KyuubiConf
2831
import org.apache.kyuubi.config.KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_HOST
2932
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationOperation, KubernetesApplicationOperation}
@@ -134,7 +137,8 @@ class KyuubiOperationKubernetesClusterClientModeSuite
134137
server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
135138

136139
test("Spark Client Mode On Kubernetes Kyuubi KubernetesApplicationOperation Suite") {
137-
val batchRequest = newSparkBatchRequest(conf.getAll)
140+
val batchRequest = newSparkBatchRequest(conf.getAll ++ Map(
141+
KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))
138142

139143
val sessionHandle = sessionManager.openBatchSession(
140144
"kyuubi",
@@ -193,7 +197,8 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
193197
"spark.kubernetes.driver.pod.name",
194198
driverPodNamePrefix + "-" + System.currentTimeMillis())
195199

196-
val batchRequest = newSparkBatchRequest(conf.getAll)
200+
val batchRequest = newSparkBatchRequest(conf.getAll ++ Map(
201+
KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))
197202

198203
val sessionHandle = sessionManager.openBatchSession(
199204
"runner",

kyuubi-common/src/main/scala/org/apache/kyuubi/util/JdbcUtils.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,4 +104,12 @@ object JdbcUtils extends Logging {
104104
case _ => "(empty)"
105105
}
106106
}
107+
108+
def isDuplicatedKeyDBErr(cause: Throwable): Boolean = {
109+
val duplicatedKeyKeywords = Seq(
110+
"duplicate key value in a unique or primary key constraint or unique index", // Derby
111+
"Duplicate entry" // MySQL
112+
)
113+
duplicatedKeyKeywords.exists(cause.getMessage.contains)
114+
}
107115
}

kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Batch.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.kyuubi.client.api.v1.dto;
1919

20+
import java.util.Collections;
21+
import java.util.Map;
2022
import java.util.Objects;
2123
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
2224
import org.apache.commons.lang3.builder.ToStringStyle;
@@ -35,6 +37,7 @@ public class Batch {
3537
private String state;
3638
private long createTime;
3739
private long endTime;
40+
private Map<String, String> batchInfo = Collections.emptyMap();
3841

3942
public Batch() {}
4043

@@ -51,7 +54,8 @@ public Batch(
5154
String kyuubiInstance,
5255
String state,
5356
long createTime,
54-
long endTime) {
57+
long endTime,
58+
Map<String, String> batchInfo) {
5559
this.id = id;
5660
this.user = user;
5761
this.batchType = batchType;
@@ -65,6 +69,7 @@ public Batch(
6569
this.state = state;
6670
this.createTime = createTime;
6771
this.endTime = endTime;
72+
this.batchInfo = batchInfo;
6873
}
6974

7075
public String getId() {
@@ -171,6 +176,17 @@ public void setEndTime(long endTime) {
171176
this.endTime = endTime;
172177
}
173178

179+
public Map<String, String> getBatchInfo() {
180+
if (batchInfo == null) {
181+
return Collections.emptyMap();
182+
}
183+
return batchInfo;
184+
}
185+
186+
public void setBatchInfo(Map<String, String> batchInfo) {
187+
this.batchInfo = batchInfo;
188+
}
189+
174190
@Override
175191
public boolean equals(Object o) {
176192
if (this == o) return true;

kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ public class BatchRequest {
2929
private String resource;
3030
private String className;
3131
private String name;
32-
private Map<String, String> conf;
33-
private List<String> args;
32+
private Map<String, String> conf = Collections.emptyMap();
33+
private List<String> args = Collections.emptyList();
3434

3535
public BatchRequest() {}
3636

@@ -54,8 +54,6 @@ public BatchRequest(String batchType, String resource, String className, String
5454
this.resource = resource;
5555
this.className = className;
5656
this.name = name;
57-
this.conf = Collections.emptyMap();
58-
this.args = Collections.emptyList();
5957
}
6058

6159
public String getBatchType() {

kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/util/BatchUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Arrays;
2121
import java.util.List;
2222
import java.util.Locale;
23+
import org.apache.kyuubi.client.api.v1.dto.Batch;
2324

2425
public final class BatchUtils {
2526
/** The batch has not been submitted to resource manager yet. */
@@ -40,6 +41,10 @@ public final class BatchUtils {
4041
public static List<String> terminalBatchStates =
4142
Arrays.asList(FINISHED_STATE, ERROR_STATE, CANCELED_STATE);
4243

44+
public static String KYUUBI_BATCH_ID_KEY = "kyuubi.batch.id";
45+
46+
public static String KYUUBI_BATCH_DUPLICATED_KEY = "kyuubi.batch.duplicated";
47+
4348
public static boolean isPendingState(String state) {
4449
return PENDING_STATE.equalsIgnoreCase(state);
4550
}
@@ -55,4 +60,8 @@ public static boolean isFinishedState(String state) {
5560
public static boolean isTerminalState(String state) {
5661
return state != null && terminalBatchStates.contains(state.toUpperCase(Locale.ROOT));
5762
}
63+
64+
public static boolean isDuplicatedSubmission(Batch batch) {
65+
return "true".equalsIgnoreCase(batch.getBatchInfo().get(KYUUBI_BATCH_DUPLICATED_KEY));
66+
}
5867
}

kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/RestClientTestUtils.java

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -45,35 +45,31 @@ public static CloseBatchResponse generateTestCloseBatchResp() {
4545
}
4646

4747
public static Batch generateTestBatch(String id) {
48-
Batch batch =
49-
new Batch(
50-
id,
51-
TEST_USERNAME,
52-
"spark",
53-
"batch_name",
54-
0,
55-
id,
56-
null,
57-
"RUNNING",
58-
null,
59-
"192.168.31.130:64573",
60-
"RUNNING",
61-
BATCH_CREATE_TIME,
62-
0);
63-
64-
return batch;
48+
return new Batch(
49+
id,
50+
TEST_USERNAME,
51+
"spark",
52+
"batch_name",
53+
0,
54+
id,
55+
null,
56+
"RUNNING",
57+
null,
58+
"192.168.31.130:64573",
59+
"RUNNING",
60+
BATCH_CREATE_TIME,
61+
0,
62+
Collections.emptyMap());
6563
}
6664

6765
public static BatchRequest generateTestBatchRequest() {
68-
BatchRequest batchRequest =
69-
new BatchRequest(
70-
"spark",
71-
"/MySpace/kyuubi-spark-sql-engine_2.12-1.6.0-SNAPSHOT.jar",
72-
"org.apache.kyuubi.engine.spark.SparkSQLEngine",
73-
"test_batch",
74-
Collections.singletonMap("spark.driver.memory", "16m"),
75-
Collections.emptyList());
76-
return batchRequest;
66+
return new BatchRequest(
67+
"spark",
68+
"/MySpace/kyuubi-spark-sql-engine_2.12-1.6.0-SNAPSHOT.jar",
69+
"org.apache.kyuubi.engine.spark.SparkSQLEngine",
70+
"test_batch",
71+
Collections.singletonMap("spark.driver.memory", "16m"),
72+
Collections.emptyList());
7773
}
7874

7975
public static GetBatchesResponse generateTestBatchesResponse() {
@@ -87,9 +83,8 @@ public static GetBatchesResponse generateTestBatchesResponse() {
8783
public static OperationLog generateTestOperationLog() {
8884
List<String> logs =
8985
Arrays.asList(
90-
"13:15:13.523 INFO org.apache.curator.framework.state."
91-
+ "ConnectionStateManager: State change: CONNECTED",
92-
"13:15:13.528 INFO org.apache.kyuubi." + "engine.EngineRef: Launching engine:");
86+
"13:15:13.523 INFO ConnectionStateManager: State change: CONNECTED",
87+
"13:15:13.528 INFO EngineRef: Launching engine:");
9388
return new OperationLog(logs, 2);
9489
}
9590
}

kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ package org.apache.kyuubi.server.api.v1
1919

2020
import java.io.InputStream
2121
import java.util
22-
import java.util.{Collections, Locale}
22+
import java.util.{Collections, Locale, UUID}
2323
import java.util.concurrent.ConcurrentHashMap
2424
import javax.ws.rs._
2525
import javax.ws.rs.core.MediaType
2626
import javax.ws.rs.core.Response.Status
2727

2828
import scala.collection.JavaConverters._
29+
import scala.util.{Failure, Success, Try}
2930
import scala.util.control.NonFatal
3031

3132
import io.swagger.v3.oas.annotations.media.{Content, Schema}
@@ -36,6 +37,7 @@ import org.glassfish.jersey.media.multipart.{FormDataContentDisposition, FormDat
3637
import org.apache.kyuubi.{Logging, Utils}
3738
import org.apache.kyuubi.client.api.v1.dto._
3839
import org.apache.kyuubi.client.exception.KyuubiRestException
40+
import org.apache.kyuubi.client.util.BatchUtils._
3941
import org.apache.kyuubi.config.KyuubiConf
4042
import org.apache.kyuubi.config.KyuubiReservedKeys._
4143
import org.apache.kyuubi.engine.{ApplicationInfo, KyuubiApplicationManager}
@@ -45,6 +47,7 @@ import org.apache.kyuubi.server.api.v1.BatchesResource._
4547
import org.apache.kyuubi.server.metadata.MetadataManager
4648
import org.apache.kyuubi.server.metadata.api.Metadata
4749
import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager, SessionHandle}
50+
import org.apache.kyuubi.util.JdbcUtils
4851

4952
@Tag(name = "Batch")
5053
@Produces(Array(MediaType.APPLICATION_JSON))
@@ -105,7 +108,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
105108
session.connectionUrl,
106109
batchOpStatus.state.toString,
107110
session.createTime,
108-
batchOpStatus.completed)
111+
batchOpStatus.completed,
112+
Map.empty[String, String].asJava)
109113
}
110114

111115
private def buildBatch(
@@ -142,7 +146,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
142146
metadata.kyuubiInstance,
143147
currentBatchState,
144148
metadata.createTime,
145-
metadata.endTime)
149+
metadata.endTime,
150+
Map.empty[String, String].asJava)
146151
}.getOrElse(MetadataManager.buildBatch(metadata))
147152
}
148153

@@ -210,22 +215,55 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
210215
}
211216
request.setBatchType(request.getBatchType.toUpperCase(Locale.ROOT))
212217

213-
val userName = fe.getSessionUser(request.getConf.asScala.toMap)
214-
val ipAddress = fe.getIpAddress
215-
request.setConf(
216-
(request.getConf.asScala ++ Map(
217-
KYUUBI_BATCH_RESOURCE_UPLOADED_KEY -> isResourceFromUpload.toString,
218-
KYUUBI_CLIENT_IP_KEY -> ipAddress,
219-
KYUUBI_SERVER_IP_KEY -> fe.host,
220-
KYUUBI_SESSION_CONNECTION_URL_KEY -> fe.connectionUrl,
221-
KYUUBI_SESSION_REAL_USER_KEY -> fe.getRealUser())).asJava)
222-
val sessionHandle = sessionManager.openBatchSession(
223-
userName,
224-
"anonymous",
225-
ipAddress,
226-
request.getConf.asScala.toMap,
227-
request)
228-
buildBatch(sessionManager.getBatchSessionImpl(sessionHandle))
218+
val userProvidedBatchId = request.getConf.asScala.get(KYUUBI_BATCH_ID_KEY)
219+
userProvidedBatchId.foreach { batchId =>
220+
try UUID.fromString(batchId)
221+
catch {
222+
case NonFatal(e) =>
223+
throw new IllegalArgumentException(s"$KYUUBI_BATCH_ID_KEY=$batchId must be an UUID", e)
224+
}
225+
}
226+
227+
userProvidedBatchId.flatMap { batchId =>
228+
Option(sessionManager.getBatchFromMetadataStore(batchId))
229+
} match {
230+
case Some(batch) =>
231+
markDuplicated(batch)
232+
case None =>
233+
val userName = fe.getSessionUser(request.getConf.asScala.toMap)
234+
val ipAddress = fe.getIpAddress
235+
val batchId = userProvidedBatchId.getOrElse(UUID.randomUUID().toString)
236+
request.setConf(
237+
(request.getConf.asScala ++ Map(
238+
KYUUBI_BATCH_ID_KEY -> batchId,
239+
KYUUBI_BATCH_RESOURCE_UPLOADED_KEY -> isResourceFromUpload.toString,
240+
KYUUBI_CLIENT_IP_KEY -> ipAddress,
241+
KYUUBI_SERVER_IP_KEY -> fe.host,
242+
KYUUBI_SESSION_CONNECTION_URL_KEY -> fe.connectionUrl,
243+
KYUUBI_SESSION_REAL_USER_KEY -> fe.getRealUser())).asJava)
244+
245+
Try {
246+
sessionManager.openBatchSession(
247+
userName,
248+
"anonymous",
249+
ipAddress,
250+
request.getConf.asScala.toMap,
251+
request)
252+
} match {
253+
case Success(sessionHandle) =>
254+
buildBatch(sessionManager.getBatchSessionImpl(sessionHandle))
255+
case Failure(cause) if JdbcUtils.isDuplicatedKeyDBErr(cause) =>
256+
val batch = sessionManager.getBatchFromMetadataStore(batchId)
257+
assert(batch != null, s"can not find duplicated batch $batchId from metadata store")
258+
markDuplicated(batch)
259+
}
260+
}
261+
}
262+
263+
private def markDuplicated(batch: Batch): Batch = {
264+
warn(s"duplicated submission: ${batch.getId}, ignore and return the existing batch.")
265+
batch.setBatchInfo(Map(KYUUBI_BATCH_DUPLICATED_KEY -> "true").asJava)
266+
batch
229267
}
230268

231269
@ApiResponse(

kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.kyuubi.server.metadata
2020
import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor, TimeUnit}
2121
import java.util.concurrent.atomic.AtomicInteger
2222

23+
import scala.collection.JavaConverters._
24+
2325
import org.apache.kyuubi.{KyuubiException, Logging}
2426
import org.apache.kyuubi.client.api.v1.dto.Batch
2527
import org.apache.kyuubi.config.KyuubiConf
@@ -29,7 +31,7 @@ import org.apache.kyuubi.operation.OperationState
2931
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
3032
import org.apache.kyuubi.service.AbstractService
3133
import org.apache.kyuubi.session.SessionType
32-
import org.apache.kyuubi.util.{ClassUtils, ThreadUtils}
34+
import org.apache.kyuubi.util.{ClassUtils, JdbcUtils, ThreadUtils}
3335

3436
class MetadataManager extends AbstractService("MetadataManager") {
3537
import MetadataManager._
@@ -105,11 +107,8 @@ class MetadataManager extends AbstractService("MetadataManager") {
105107
}
106108

107109
protected def unrecoverableDBErr(cause: Throwable): Boolean = {
108-
val unrecoverableKeywords = Seq(
109-
"duplicate key value in a unique or primary key constraint or unique index", // Derby
110-
"Duplicate entry" // MySQL
111-
)
112-
unrecoverableKeywords.exists(cause.getMessage.contains)
110+
// cover other cases in the future
111+
JdbcUtils.isDuplicatedKeyDBErr(cause)
113112
}
114113

115114
def insertMetadata(metadata: Metadata, asyncRetryOnError: Boolean = true): Unit = {
@@ -334,6 +333,7 @@ object MetadataManager extends Logging {
334333
batchMetadata.kyuubiInstance,
335334
batchState,
336335
batchMetadata.createTime,
337-
batchMetadata.endTime)
336+
batchMetadata.endTime,
337+
Map.empty[String, String].asJava)
338338
}
339339
}

0 commit comments

Comments
 (0)