Skip to content

Commit 85e7343

Browse files
committed
Allow user to provide batch id on creating batch
1 parent 15a83e1 commit 85e7343

File tree

8 files changed

+146
-65
lines changed

8 files changed

+146
-65
lines changed

kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ object KyuubiReservedKeys {
2626
final val KYUUBI_SESSION_USER_SIGN = "kyuubi.session.user.sign"
2727
final val KYUUBI_SESSION_REAL_USER_KEY = "kyuubi.session.real.user"
2828
final val KYUUBI_SESSION_CONNECTION_URL_KEY = "kyuubi.session.connection.url"
29+
final val KYUUBI_BATCH_ID_KEY = "kyuubi.batch.id"
30+
final val KYUUBI_BATCH_DUPLICATED_KEY = "kyuubi.batch.duplicated"
2931
final val KYUUBI_BATCH_RESOURCE_UPLOADED_KEY = "kyuubi.batch.resource.uploaded"
3032
final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
3133
final val KYUUBI_ENGINE_ID = "kyuubi.engine.id"

kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Batch.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.kyuubi.client.api.v1.dto;
1919

20-
import java.util.Objects;
2120
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
2221
import org.apache.commons.lang3.builder.ToStringStyle;
2322

23+
import java.util.Collections;
24+
import java.util.Map;
25+
import java.util.Objects;
26+
2427
public class Batch {
2528
private String id;
2629
private String user;
@@ -35,6 +38,7 @@ public class Batch {
3538
private String state;
3639
private long createTime;
3740
private long endTime;
41+
private Map<String, String> batchInfo = Collections.emptyMap();
3842

3943
public Batch() {}
4044

@@ -51,7 +55,8 @@ public Batch(
5155
String kyuubiInstance,
5256
String state,
5357
long createTime,
54-
long endTime) {
58+
long endTime,
59+
Map<String, String> batchInfo) {
5560
this.id = id;
5661
this.user = user;
5762
this.batchType = batchType;
@@ -65,6 +70,7 @@ public Batch(
6570
this.state = state;
6671
this.createTime = createTime;
6772
this.endTime = endTime;
73+
this.batchInfo = batchInfo;
6874
}
6975

7076
public String getId() {
@@ -171,6 +177,17 @@ public void setEndTime(long endTime) {
171177
this.endTime = endTime;
172178
}
173179

180+
public Map<String, String> getBatchInfo() {
181+
if (batchInfo == null) {
182+
return Collections.emptyMap();
183+
}
184+
return batchInfo;
185+
}
186+
187+
public void setBatchInfo(Map<String, String> batchInfo) {
188+
this.batchInfo = batchInfo;
189+
}
190+
174191
@Override
175192
public boolean equals(Object o) {
176193
if (this == o) return true;

kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ public class BatchRequest {
2929
private String resource;
3030
private String className;
3131
private String name;
32-
private Map<String, String> conf;
33-
private List<String> args;
32+
private Map<String, String> conf = Collections.emptyMap();
33+
private List<String> args = Collections.emptyList();
3434

3535
public BatchRequest() {}
3636

@@ -54,8 +54,6 @@ public BatchRequest(String batchType, String resource, String className, String
5454
this.resource = resource;
5555
this.className = className;
5656
this.name = name;
57-
this.conf = Collections.emptyMap();
58-
this.args = Collections.emptyList();
5957
}
6058

6159
public String getBatchType() {

kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/RestClientTestUtils.java

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -45,35 +45,31 @@ public static CloseBatchResponse generateTestCloseBatchResp() {
4545
}
4646

4747
public static Batch generateTestBatch(String id) {
48-
Batch batch =
49-
new Batch(
50-
id,
51-
TEST_USERNAME,
52-
"spark",
53-
"batch_name",
54-
0,
55-
id,
56-
null,
57-
"RUNNING",
58-
null,
59-
"192.168.31.130:64573",
60-
"RUNNING",
61-
BATCH_CREATE_TIME,
62-
0);
63-
64-
return batch;
48+
return new Batch(
49+
id,
50+
TEST_USERNAME,
51+
"spark",
52+
"batch_name",
53+
0,
54+
id,
55+
null,
56+
"RUNNING",
57+
null,
58+
"192.168.31.130:64573",
59+
"RUNNING",
60+
BATCH_CREATE_TIME,
61+
0,
62+
Collections.emptyMap());
6563
}
6664

6765
public static BatchRequest generateTestBatchRequest() {
68-
BatchRequest batchRequest =
69-
new BatchRequest(
70-
"spark",
71-
"/MySpace/kyuubi-spark-sql-engine_2.12-1.6.0-SNAPSHOT.jar",
72-
"org.apache.kyuubi.engine.spark.SparkSQLEngine",
73-
"test_batch",
74-
Collections.singletonMap("spark.driver.memory", "16m"),
75-
Collections.emptyList());
76-
return batchRequest;
66+
return new BatchRequest(
67+
"spark",
68+
"/MySpace/kyuubi-spark-sql-engine_2.12-1.6.0-SNAPSHOT.jar",
69+
"org.apache.kyuubi.engine.spark.SparkSQLEngine",
70+
"test_batch",
71+
Collections.singletonMap("spark.driver.memory", "16m"),
72+
Collections.emptyList());
7773
}
7874

7975
public static GetBatchesResponse generateTestBatchesResponse() {
@@ -87,9 +83,8 @@ public static GetBatchesResponse generateTestBatchesResponse() {
8783
public static OperationLog generateTestOperationLog() {
8884
List<String> logs =
8985
Arrays.asList(
90-
"13:15:13.523 INFO org.apache.curator.framework.state."
91-
+ "ConnectionStateManager: State change: CONNECTED",
92-
"13:15:13.528 INFO org.apache.kyuubi." + "engine.EngineRef: Launching engine:");
86+
"13:15:13.523 INFO ConnectionStateManager: State change: CONNECTED",
87+
"13:15:13.528 INFO EngineRef: Launching engine:");
9388
return new OperationLog(logs, 2);
9489
}
9590
}

kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala

Lines changed: 60 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ package org.apache.kyuubi.server.api.v1
1919

2020
import java.io.InputStream
2121
import java.util
22-
import java.util.{Collections, Locale}
22+
import java.util.{Collections, Locale, UUID}
2323
import java.util.concurrent.ConcurrentHashMap
2424
import javax.ws.rs._
2525
import javax.ws.rs.core.MediaType
2626
import javax.ws.rs.core.Response.Status
2727

2828
import scala.collection.JavaConverters._
29+
import scala.util.{Failure, Success, Try}
2930
import scala.util.control.NonFatal
3031

3132
import io.swagger.v3.oas.annotations.media.{Content, Schema}
@@ -105,7 +106,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
105106
session.connectionUrl,
106107
batchOpStatus.state.toString,
107108
session.createTime,
108-
batchOpStatus.completed)
109+
batchOpStatus.completed,
110+
Map.empty[String, String].asJava)
109111
}
110112

111113
private def buildBatch(
@@ -142,7 +144,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
142144
metadata.kyuubiInstance,
143145
currentBatchState,
144146
metadata.createTime,
145-
metadata.endTime)
147+
metadata.endTime,
148+
Map.empty[String, String].asJava)
146149
}.getOrElse(MetadataManager.buildBatch(metadata))
147150
}
148151

@@ -210,22 +213,60 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
210213
}
211214
request.setBatchType(request.getBatchType.toUpperCase(Locale.ROOT))
212215

213-
val userName = fe.getSessionUser(request.getConf.asScala.toMap)
214-
val ipAddress = fe.getIpAddress
215-
request.setConf(
216-
(request.getConf.asScala ++ Map(
217-
KYUUBI_BATCH_RESOURCE_UPLOADED_KEY -> isResourceFromUpload.toString,
218-
KYUUBI_CLIENT_IP_KEY -> ipAddress,
219-
KYUUBI_SERVER_IP_KEY -> fe.host,
220-
KYUUBI_SESSION_CONNECTION_URL_KEY -> fe.connectionUrl,
221-
KYUUBI_SESSION_REAL_USER_KEY -> fe.getRealUser())).asJava)
222-
val sessionHandle = sessionManager.openBatchSession(
223-
userName,
224-
"anonymous",
225-
ipAddress,
226-
request.getConf.asScala.toMap,
227-
request)
228-
buildBatch(sessionManager.getBatchSessionImpl(sessionHandle))
216+
val userProvidedBatchId = request.getConf.asScala.get(KYUUBI_BATCH_ID_KEY)
217+
userProvidedBatchId.foreach { batchId =>
218+
try UUID.fromString(batchId)
219+
catch {
220+
case NonFatal(e) =>
221+
throw new IllegalArgumentException(s"$KYUUBI_BATCH_ID_KEY=$batchId must be an UUID", e)
222+
}
223+
}
224+
225+
userProvidedBatchId.flatMap { batchId =>
226+
Option(sessionManager.getBatchFromMetadataStore(batchId))
227+
} match {
228+
case Some(batch) =>
229+
duplicatedBatch(batch)
230+
case None =>
231+
val userName = fe.getSessionUser(request.getConf.asScala.toMap)
232+
val ipAddress = fe.getIpAddress
233+
val batchId = userProvidedBatchId.getOrElse(UUID.randomUUID().toString)
234+
request.setConf(
235+
(request.getConf.asScala ++ Map(
236+
KYUUBI_BATCH_ID_KEY -> batchId,
237+
KYUUBI_BATCH_RESOURCE_UPLOADED_KEY -> isResourceFromUpload.toString,
238+
KYUUBI_CLIENT_IP_KEY -> ipAddress,
239+
KYUUBI_SERVER_IP_KEY -> fe.host,
240+
KYUUBI_SESSION_CONNECTION_URL_KEY -> fe.connectionUrl,
241+
KYUUBI_SESSION_REAL_USER_KEY -> fe.getRealUser())).asJava)
242+
243+
def duplicatedBatchId(errorMsg: String): Boolean = {
244+
// TODO match the error message for Derby and MySQL
245+
errorMsg.contains("duplicated")
246+
}
247+
248+
Try {
249+
sessionManager.openBatchSession(
250+
userName,
251+
"anonymous",
252+
ipAddress,
253+
request.getConf.asScala.toMap,
254+
request)
255+
} match {
256+
case Success(sessionHandle) =>
257+
buildBatch(sessionManager.getBatchSessionImpl(sessionHandle))
258+
case Failure(cause) if duplicatedBatchId(cause.getMessage) =>
259+
val batch = sessionManager.getBatchFromMetadataStore(batchId)
260+
assert(batch != null, s"can not find duplicated batch $batchId from metadata store")
261+
duplicatedBatch(batch)
262+
}
263+
}
264+
}
265+
266+
private def duplicatedBatch(batch: Batch): Batch = {
267+
warn(s"duplicated submission: ${batch.getId}, ignore and return the existing batch.")
268+
batch.setBatchInfo(Map(KYUUBI_BATCH_DUPLICATED_KEY -> "true").asJava)
269+
batch
229270
}
230271

231272
@ApiResponse(

kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.kyuubi.server.metadata
2020
import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor, TimeUnit}
2121
import java.util.concurrent.atomic.AtomicInteger
2222

23+
import scala.collection.JavaConverters._
24+
2325
import org.apache.kyuubi.{KyuubiException, Logging}
2426
import org.apache.kyuubi.client.api.v1.dto.Batch
2527
import org.apache.kyuubi.config.KyuubiConf
@@ -319,6 +321,7 @@ object MetadataManager extends Logging {
319321
batchMetadata.kyuubiInstance,
320322
batchState,
321323
batchMetadata.createTime,
322-
batchMetadata.endTime)
324+
batchMetadata.endTime,
325+
Map.empty[String, String].asJava)
323326
}
324327
}

kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717

1818
package org.apache.kyuubi.session
1919

20-
import java.util.UUID
21-
2220
import scala.collection.JavaConverters._
2321

2422
import org.apache.hive.service.rpc.thrift.TProtocolVersion
2523

2624
import org.apache.kyuubi.client.api.v1.dto.BatchRequest
2725
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
26+
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_BATCH_ID_KEY
2827
import org.apache.kyuubi.engine.KyuubiApplicationManager
2928
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
3029
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
@@ -50,9 +49,10 @@ class KyuubiBatchSessionImpl(
5049
sessionManager) {
5150
override val sessionType: SessionType = SessionType.BATCH
5251

53-
override val handle: SessionHandle = recoveryMetadata.map { metadata =>
54-
SessionHandle(UUID.fromString(metadata.identifier))
55-
}.getOrElse(SessionHandle())
52+
override val handle: SessionHandle = {
53+
val batchId = recoveryMetadata.map(_.identifier).getOrElse(conf(KYUUBI_BATCH_ID_KEY))
54+
SessionHandle.fromUUID(batchId)
55+
}
5656

5757
override def createTime: Long = recoveryMetadata.map(_.createTime).getOrElse(super.createTime)
5858

@@ -105,7 +105,7 @@ class KyuubiBatchSessionImpl(
105105
}
106106

107107
private val sessionEvent = KyuubiSessionEvent(this)
108-
recoveryMetadata.map(metadata => sessionEvent.engineId = metadata.engineId)
108+
recoveryMetadata.foreach(metadata => sessionEvent.engineId = metadata.engineId)
109109
EventBus.post(sessionEvent)
110110

111111
override def getSessionEvent: Option[KyuubiSessionEvent] = {

0 commit comments

Comments
 (0)