Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster name in spark submit params #2467

Merged
merged 2 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ private Builder() {
config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY);
config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION);
config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION);
config.put(FLINT_INDEX_STORE_CLUSTER_NAME_KEY, FLINT_DEFAULT_CLUSTER_NAME);
config.put(FLINT_INDEX_STORE_HOST_KEY, FLINT_DEFAULT_HOST);
config.put(FLINT_INDEX_STORE_PORT_KEY, FLINT_DEFAULT_PORT);
config.put(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME);
Expand All @@ -77,6 +78,11 @@ public Builder className(String className) {
return this;
}

public Builder clusterName(String clusterName) {
config.put(FLINT_INDEX_STORE_CLUSTER_NAME_KEY, clusterName);
noCharger marked this conversation as resolved.
Show resolved Hide resolved
return this;
}

public Builder dataSource(DataSourceMetadata metadata) {
if (DataSourceType.S3GLUE.equals(metadata.getConnector())) {
String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class SparkConstants {
public static final String FLINT_INTEGRATION_JAR =
"s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar";
// TODO should be replaced with mvn jar.
public static final String FLINT_DEFAULT_CLUSTER_NAME = "opensearch-cluster";
public static final String FLINT_DEFAULT_HOST = "localhost";
public static final String FLINT_DEFAULT_PORT = "9200";
public static final String FLINT_DEFAULT_SCHEME = "http";
Expand All @@ -45,6 +46,7 @@ public class SparkConstants {
public static final String SPARK_DRIVER_ENV_JAVA_HOME_KEY =
"spark.emr-serverless.driverEnv.JAVA_HOME";
public static final String SPARK_EXECUTOR_ENV_JAVA_HOME_KEY = "spark.executorEnv.JAVA_HOME";
public static final String FLINT_INDEX_STORE_CLUSTER_NAME_KEY = "spark.flint.clusterName";
public static final String FLINT_INDEX_STORE_HOST_KEY = "spark.datasource.flint.host";
public static final String FLINT_INDEX_STORE_PORT_KEY = "spark.datasource.flint.port";
public static final String FLINT_INDEX_STORE_SCHEME_KEY = "spark.datasource.flint.scheme";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource()));

String jobName = dispatchQueryRequest.getClusterName() + ":" + "non-index-query";
String clusterName = dispatchQueryRequest.getClusterName();
String jobName = clusterName + ":" + "non-index-query";
Map<String, String> tags = context.getTags();
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();

Expand All @@ -79,6 +80,7 @@ public DispatchQueryResponse submit(
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.Builder.builder()
.clusterName(clusterName)
.dataSource(context.getDataSourceMetadata())
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams())
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
Session session = null;
String jobName = dispatchQueryRequest.getClusterName() + ":" + "non-index-query";
String clusterName = dispatchQueryRequest.getClusterName();
String jobName = clusterName + ":" + "non-index-query";
Map<String, String> tags = context.getTags();
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();

Expand Down Expand Up @@ -98,6 +99,7 @@ public DispatchQueryResponse submit(
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.Builder.builder()
.className(FLINT_SESSION_CLASS_NAME)
.clusterName(clusterName)
.dataSource(dataSourceMetadata)
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams()),
tags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public DispatchQueryResponse submit(

leaseManager.borrow(new LeaseRequest(JobType.STREAMING, dispatchQueryRequest.getDatasource()));

String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query";
String clusterName = dispatchQueryRequest.getClusterName();
String jobName = clusterName + ":" + "index-query";
IndexQueryDetails indexQueryDetails = context.getIndexQueryDetails();
Map<String, String> tags = context.getTags();
tags.put(INDEX_TAG_KEY, indexQueryDetails.openSearchIndexName());
Expand All @@ -56,6 +57,7 @@ public DispatchQueryResponse submit(
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.Builder.builder()
.clusterName(clusterName)
.dataSource(dataSourceMetadata)
.structuredStreaming(true)
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,7 @@ private String constructExpectedSparkSubmitParameterString(
+ " --conf"
+ " spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/"
+ " --conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/"
+ " --conf"
+ " --conf spark.flint.clusterName=TEST_CLUSTER --conf"
+ " spark.datasource.flint.host=search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com"
+ " --conf spark.datasource.flint.port=-1 --conf"
+ " spark.datasource.flint.scheme=https --conf spark.datasource.flint.auth="
Expand Down
Loading