Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.kyuubi.kubernetes.test.spark

import java.util.UUID

import scala.collection.JavaConverters._
import scala.concurrent.duration._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.net.NetUtils

import org.apache.kyuubi.{BatchTestHelper, KyuubiException, Logging, Utils, WithKyuubiServer, WithSimpleDFSService}
import org.apache.kyuubi._
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_HOST
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationOperation, KubernetesApplicationOperation}
Expand Down Expand Up @@ -134,7 +137,8 @@ class KyuubiOperationKubernetesClusterClientModeSuite
server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]

test("Spark Client Mode On Kubernetes Kyuubi KubernetesApplicationOperation Suite") {
val batchRequest = newSparkBatchRequest(conf.getAll)
val batchRequest = newSparkBatchRequest(conf.getAll ++ Map(
KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))

val sessionHandle = sessionManager.openBatchSession(
"kyuubi",
Expand Down Expand Up @@ -193,7 +197,8 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
"spark.kubernetes.driver.pod.name",
driverPodNamePrefix + "-" + System.currentTimeMillis())

val batchRequest = newSparkBatchRequest(conf.getAll)
val batchRequest = newSparkBatchRequest(conf.getAll ++ Map(
KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))

val sessionHandle = sessionManager.openBatchSession(
"runner",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,12 @@ object JdbcUtils extends Logging {
case _ => "(empty)"
}
}

def isDuplicatedKeyDBErr(cause: Throwable): Boolean = {
val duplicatedKeyKeywords = Seq(
"duplicate key value in a unique or primary key constraint or unique index", // Derby
"Duplicate entry" // MySQL
)
duplicatedKeyKeywords.exists(cause.getMessage.contains)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.kyuubi.client.api.v1.dto;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
Expand All @@ -35,6 +37,7 @@ public class Batch {
private String state;
private long createTime;
private long endTime;
private Map<String, String> batchInfo = Collections.emptyMap();

public Batch() {}

Expand All @@ -51,7 +54,8 @@ public Batch(
String kyuubiInstance,
String state,
long createTime,
long endTime) {
long endTime,
Map<String, String> batchInfo) {
this.id = id;
this.user = user;
this.batchType = batchType;
Expand All @@ -65,6 +69,7 @@ public Batch(
this.state = state;
this.createTime = createTime;
this.endTime = endTime;
this.batchInfo = batchInfo;
}

public String getId() {
Expand Down Expand Up @@ -171,6 +176,17 @@ public void setEndTime(long endTime) {
this.endTime = endTime;
}

public Map<String, String> getBatchInfo() {
if (batchInfo == null) {
return Collections.emptyMap();
}
return batchInfo;
}

public void setBatchInfo(Map<String, String> batchInfo) {
this.batchInfo = batchInfo;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public class BatchRequest {
private String resource;
private String className;
private String name;
private Map<String, String> conf;
private List<String> args;
private Map<String, String> conf = Collections.emptyMap();
private List<String> args = Collections.emptyList();

public BatchRequest() {}

Expand All @@ -54,8 +54,6 @@ public BatchRequest(String batchType, String resource, String className, String
this.resource = resource;
this.className = className;
this.name = name;
this.conf = Collections.emptyMap();
this.args = Collections.emptyList();
}

public String getBatchType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import org.apache.kyuubi.client.api.v1.dto.Batch;

public final class BatchUtils {
/** The batch has not been submitted to resource manager yet. */
Expand All @@ -40,6 +41,10 @@ public final class BatchUtils {
public static List<String> terminalBatchStates =
Arrays.asList(FINISHED_STATE, ERROR_STATE, CANCELED_STATE);

public static String KYUUBI_BATCH_ID_KEY = "kyuubi.batch.id";

public static String KYUUBI_BATCH_DUPLICATED_KEY = "kyuubi.batch.duplicated";

public static boolean isPendingState(String state) {
return PENDING_STATE.equalsIgnoreCase(state);
}
Expand All @@ -55,4 +60,8 @@ public static boolean isFinishedState(String state) {
public static boolean isTerminalState(String state) {
return state != null && terminalBatchStates.contains(state.toUpperCase(Locale.ROOT));
}

public static boolean isDuplicatedSubmission(Batch batch) {
return "true".equalsIgnoreCase(batch.getBatchInfo().get(KYUUBI_BATCH_DUPLICATED_KEY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,35 +45,31 @@ public static CloseBatchResponse generateTestCloseBatchResp() {
}

public static Batch generateTestBatch(String id) {
Batch batch =
new Batch(
id,
TEST_USERNAME,
"spark",
"batch_name",
0,
id,
null,
"RUNNING",
null,
"192.168.31.130:64573",
"RUNNING",
BATCH_CREATE_TIME,
0);

return batch;
return new Batch(
id,
TEST_USERNAME,
"spark",
"batch_name",
0,
id,
null,
"RUNNING",
null,
"192.168.31.130:64573",
"RUNNING",
BATCH_CREATE_TIME,
0,
Collections.emptyMap());
}

public static BatchRequest generateTestBatchRequest() {
BatchRequest batchRequest =
new BatchRequest(
"spark",
"/MySpace/kyuubi-spark-sql-engine_2.12-1.6.0-SNAPSHOT.jar",
"org.apache.kyuubi.engine.spark.SparkSQLEngine",
"test_batch",
Collections.singletonMap("spark.driver.memory", "16m"),
Collections.emptyList());
return batchRequest;
return new BatchRequest(
"spark",
"/MySpace/kyuubi-spark-sql-engine_2.12-1.6.0-SNAPSHOT.jar",
"org.apache.kyuubi.engine.spark.SparkSQLEngine",
"test_batch",
Collections.singletonMap("spark.driver.memory", "16m"),
Collections.emptyList());
}

public static GetBatchesResponse generateTestBatchesResponse() {
Expand All @@ -87,9 +83,8 @@ public static GetBatchesResponse generateTestBatchesResponse() {
public static OperationLog generateTestOperationLog() {
List<String> logs =
Arrays.asList(
"13:15:13.523 INFO org.apache.curator.framework.state."
+ "ConnectionStateManager: State change: CONNECTED",
"13:15:13.528 INFO org.apache.kyuubi." + "engine.EngineRef: Launching engine:");
"13:15:13.523 INFO ConnectionStateManager: State change: CONNECTED",
"13:15:13.528 INFO EngineRef: Launching engine:");
return new OperationLog(logs, 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package org.apache.kyuubi.server.api.v1

import java.io.InputStream
import java.util
import java.util.{Collections, Locale}
import java.util.{Collections, Locale, UUID}
import java.util.concurrent.ConcurrentHashMap
import javax.ws.rs._
import javax.ws.rs.core.MediaType
import javax.ws.rs.core.Response.Status

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal

import io.swagger.v3.oas.annotations.media.{Content, Schema}
Expand All @@ -36,6 +37,7 @@ import org.glassfish.jersey.media.multipart.{FormDataContentDisposition, FormDat
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.client.exception.KyuubiRestException
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiReservedKeys._
import org.apache.kyuubi.engine.{ApplicationInfo, KyuubiApplicationManager}
Expand All @@ -45,6 +47,7 @@ import org.apache.kyuubi.server.api.v1.BatchesResource._
import org.apache.kyuubi.server.metadata.MetadataManager
import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager, SessionHandle}
import org.apache.kyuubi.util.JdbcUtils

@Tag(name = "Batch")
@Produces(Array(MediaType.APPLICATION_JSON))
Expand Down Expand Up @@ -105,7 +108,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
session.connectionUrl,
batchOpStatus.state.toString,
session.createTime,
batchOpStatus.completed)
batchOpStatus.completed,
Map.empty[String, String].asJava)
}

private def buildBatch(
Expand Down Expand Up @@ -142,7 +146,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
metadata.kyuubiInstance,
currentBatchState,
metadata.createTime,
metadata.endTime)
metadata.endTime,
Map.empty[String, String].asJava)
}.getOrElse(MetadataManager.buildBatch(metadata))
}

Expand Down Expand Up @@ -210,22 +215,55 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
}
request.setBatchType(request.getBatchType.toUpperCase(Locale.ROOT))

val userName = fe.getSessionUser(request.getConf.asScala.toMap)
val ipAddress = fe.getIpAddress
request.setConf(
(request.getConf.asScala ++ Map(
KYUUBI_BATCH_RESOURCE_UPLOADED_KEY -> isResourceFromUpload.toString,
KYUUBI_CLIENT_IP_KEY -> ipAddress,
KYUUBI_SERVER_IP_KEY -> fe.host,
KYUUBI_SESSION_CONNECTION_URL_KEY -> fe.connectionUrl,
KYUUBI_SESSION_REAL_USER_KEY -> fe.getRealUser())).asJava)
val sessionHandle = sessionManager.openBatchSession(
userName,
"anonymous",
ipAddress,
request.getConf.asScala.toMap,
request)
buildBatch(sessionManager.getBatchSessionImpl(sessionHandle))
val userProvidedBatchId = request.getConf.asScala.get(KYUUBI_BATCH_ID_KEY)
userProvidedBatchId.foreach { batchId =>
try UUID.fromString(batchId)
catch {
case NonFatal(e) =>
throw new IllegalArgumentException(s"$KYUUBI_BATCH_ID_KEY=$batchId must be an UUID", e)
}
}

userProvidedBatchId.flatMap { batchId =>
Option(sessionManager.getBatchFromMetadataStore(batchId))
} match {
case Some(batch) =>
markDuplicated(batch)
case None =>
val userName = fe.getSessionUser(request.getConf.asScala.toMap)
val ipAddress = fe.getIpAddress
val batchId = userProvidedBatchId.getOrElse(UUID.randomUUID().toString)
request.setConf(
(request.getConf.asScala ++ Map(
KYUUBI_BATCH_ID_KEY -> batchId,
KYUUBI_BATCH_RESOURCE_UPLOADED_KEY -> isResourceFromUpload.toString,
KYUUBI_CLIENT_IP_KEY -> ipAddress,
KYUUBI_SERVER_IP_KEY -> fe.host,
KYUUBI_SESSION_CONNECTION_URL_KEY -> fe.connectionUrl,
KYUUBI_SESSION_REAL_USER_KEY -> fe.getRealUser())).asJava)

Try {
sessionManager.openBatchSession(
userName,
"anonymous",
ipAddress,
request.getConf.asScala.toMap,
request)
} match {
case Success(sessionHandle) =>
buildBatch(sessionManager.getBatchSessionImpl(sessionHandle))
case Failure(cause) if JdbcUtils.isDuplicatedKeyDBErr(cause) =>
val batch = sessionManager.getBatchFromMetadataStore(batchId)
assert(batch != null, s"can not find duplicated batch $batchId from metadata store")
markDuplicated(batch)
}
}
}

private def markDuplicated(batch: Batch): Batch = {
warn(s"duplicated submission: ${batch.getId}, ignore and return the existing batch.")
batch.setBatchInfo(Map(KYUUBI_BATCH_DUPLICATED_KEY -> "true").asJava)
batch
}

@ApiResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.kyuubi.server.metadata
import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._

import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.client.api.v1.dto.Batch
import org.apache.kyuubi.config.KyuubiConf
Expand All @@ -29,7 +31,7 @@ import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.session.SessionType
import org.apache.kyuubi.util.{ClassUtils, ThreadUtils}
import org.apache.kyuubi.util.{ClassUtils, JdbcUtils, ThreadUtils}

class MetadataManager extends AbstractService("MetadataManager") {
import MetadataManager._
Expand Down Expand Up @@ -105,11 +107,8 @@ class MetadataManager extends AbstractService("MetadataManager") {
}

protected def unrecoverableDBErr(cause: Throwable): Boolean = {
val unrecoverableKeywords = Seq(
"duplicate key value in a unique or primary key constraint or unique index", // Derby
"Duplicate entry" // MySQL
)
unrecoverableKeywords.exists(cause.getMessage.contains)
// cover other cases in the future
JdbcUtils.isDuplicatedKeyDBErr(cause)
}

def insertMetadata(metadata: Metadata, asyncRetryOnError: Boolean = true): Unit = {
Expand Down Expand Up @@ -334,6 +333,7 @@ object MetadataManager extends Logging {
batchMetadata.kyuubiInstance,
batchState,
batchMetadata.createTime,
batchMetadata.endTime)
batchMetadata.endTime,
Map.empty[String, String].asJava)
}
}
Loading