Skip to content

Commit

Permalink
Merge branch 'apache:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
ShawHee authored Sep 2, 2024
2 parents 64531cf + 232990d commit 6c6bb26
Show file tree
Hide file tree
Showing 105 changed files with 8,577 additions and 1,141 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ jobs:
strategy:
matrix:
case:
- name: Flink118OnRemoteClusterDeployTest
class: org.apache.streampark.e2e.cases.Flink118OnRemoteClusterDeployTest
- name: Flink117OnRemoteClusterDeployTest
class: org.apache.streampark.e2e.cases.Flink117OnRemoteClusterDeployTest
- name: Flink116OnRemoteClusterDeployTest
class: org.apache.streampark.e2e.cases.Flink116OnRemoteClusterDeployTest
- name: EnvironmentTest
class: org.apache.streampark.e2e.cases.EnvironmentTest
- name: AlarmTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/** The flink deployment mode enum. */
/** The spark deployment mode enum. */
public enum SparkDevelopmentMode {

/** Unknown type replace null */
Expand All @@ -29,8 +29,11 @@ public enum SparkDevelopmentMode {
/** custom code */
CUSTOM_CODE("Custom Code", 1),

/** spark SQL */
SPARK_SQL("Spark SQL", 2);
/** Spark SQL */
SPARK_SQL("Spark SQL", 2),

/** Py spark Mode */
PYSPARK("Python Spark", 3);

private final String name;

Expand All @@ -44,17 +47,22 @@ public enum SparkDevelopmentMode {
/**
* Try to resolve the mode value into {@link SparkDevelopmentMode}.
*
* @param value The mode value of potential flink deployment mode.
* @return The parsed flink deployment mode.
* @param value The mode value of potential spark deployment mode.
* @return The parsed spark deployment mode.
*/
@Nonnull
public static SparkDevelopmentMode valueOf(@Nullable Integer value) {
for (SparkDevelopmentMode flinkDevelopmentMode : values()) {
if (flinkDevelopmentMode.mode.equals(value)) {
return flinkDevelopmentMode;
for (SparkDevelopmentMode sparkDevelopmentMode : values()) {
if (sparkDevelopmentMode.mode.equals(value)) {
return sparkDevelopmentMode;
}
}
return SparkDevelopmentMode.UNKNOWN;
}

/** Get the mode value of the current {@link SparkDevelopmentMode} enum. */
@Nonnull
public Integer getMode() {
return mode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.common.enums;

import javax.annotation.Nullable;

/** Spark SQL validation failed type enum. */
public enum SparkSqlValidationFailedType {

/** Basic test failed (such as null, etc.) */
VERIFY_FAILED(1),

/** syntax error */
SYNTAX_ERROR(2),

/** unsupported dialect */
UNSUPPORTED_DIALECT(3),

/** unsupported sql command */
UNSUPPORTED_SQL(4),

/** Not at the end of ";" */
ENDS_WITH(5),

/** Class exception */
CLASS_ERROR(6);

private final int failedType;

SparkSqlValidationFailedType(int failedType) {
this.failedType = failedType;
}

/**
* Try to resolve the given spark SQL validation failed type value into a known {@link
* SparkSqlValidationFailedType} enum.
*/
@Nullable
public static SparkSqlValidationFailedType of(Integer value) {
for (SparkSqlValidationFailedType type : values()) {
if (type.failedType == value) {
return type;
}
}
return null;
}

public int getFailedType() {
return failedType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,33 @@ object ConfigKeys {

val KEY_SPARK_BATCH_DURATION = "spark.batch.duration"

val KEY_SPARK_DRIVER_CORES = "spark.driver.cores"

val KEY_SPARK_DRIVER_MEMORY = "spark.driver.memory"

val KEY_SPARK_EXECUTOR_INSTANCES = "spark.executor.instances"

val KEY_SPARK_EXECUTOR_CORES = "spark.executor.cores"

val KEY_SPARK_EXECUTOR_MEMORY = "spark.executor.memory"

val KEY_SPARK_DYNAMIC_ALLOCATION_ENABLED = "spark.dynamicAllocation.enabled"

val KEY_SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS = "spark.dynamicAllocation.maxExecutors"

val KEY_SPARK_YARN_QUEUE = "spark.yarn.queue"

val KEY_SPARK_YARN_QUEUE_NAME = "yarnQueueName"

val KEY_SPARK_YARN_QUEUE_LABEL = "yarnQueueLabel"

val KEY_SPARK_YARN_AM_NODE_LABEL = "spark.yarn.am.nodeLabelExpression"

val KEY_SPARK_YARN_EXECUTOR_NODE_LABEL = "spark.yarn.executor.nodeLabelExpression"

def KEY_SPARK_SQL(prefix: String = null): String =
s"${Option(prefix).getOrElse("")}sql"

/** about config flink */
def KEY_APP_CONF(prefix: String = null): String =
s"${Option(prefix).getOrElse("")}conf"
Expand All @@ -87,7 +114,7 @@ object ConfigKeys {

val KEY_FLINK_TABLE_PREFIX = "flink.table."

val KEY_SPARK_PROPERTY_PREFIX = "spark.property."
val KEY_SPARK_PROPERTY_PREFIX = "spark."

val KEY_APP_PREFIX = "app."

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ object PropertiesUtils extends Logger {

private[this] lazy val PROPERTY_PATTERN = Pattern.compile("(.*?)=(.*?)")

private[this] lazy val SPARK_PROPERTY_COMPLEX_PATTERN = Pattern.compile("^[\"']?(.*?)=(.*?)[\"']?$")

// scalastyle:off
private[this] lazy val SPARK_ARGUMENT_REGEXP = "\"?(\\s+|$)(?=(([^\"]*\"){2})*[^\"]*$)\"?"
// scalastyle:on

private[this] lazy val MULTI_PROPERTY_REGEXP = "-D(.*?)\\s*=\\s*[\\\"|'](.*)[\\\"|']"

private[this] lazy val MULTI_PROPERTY_PATTERN = Pattern.compile(MULTI_PROPERTY_REGEXP)
Expand Down Expand Up @@ -380,4 +386,48 @@ object PropertiesUtils extends Logger {
new JavaHashMap[String, JavaMap[String, String]](map)
}

/** extract spark configuration from sparkApplication.appProperties */
@Nonnull def extractSparkPropertiesAsJava(properties: String): JavaMap[String, String] =
new JavaHashMap[String, String](extractSparkProperties(properties))

@Nonnull def extractSparkProperties(properties: String): Map[String, String] = {
if (StringUtils.isEmpty(properties)) Map.empty[String, String]
else {
val map = mutable.Map[String, String]()
properties.split("(\\s)*(--conf|-c)(\\s)+") match {
case d if Utils.isNotEmpty(d) =>
d.foreach(x => {
if (x.nonEmpty) {
val p = SPARK_PROPERTY_COMPLEX_PATTERN.matcher(x)
if (p.matches) {
map += p.group(1).trim -> p.group(2).trim
}
}
})
case _ =>
}
map.toMap
}
}

/** extract spark configuration from sparkApplication.appArgs */
@Nonnull def extractSparkArgumentsAsJava(arguments: String): JavaList[String] =
new JavaArrayList[String](extractSparkArguments(arguments))

@Nonnull def extractSparkArguments(arguments: String): List[String] = {
if (StringUtils.isEmpty(arguments)) List.empty[String]
else {
val list = List[String]()
arguments.split(SPARK_ARGUMENT_REGEXP) match {
case d if Utils.isNotEmpty(d) =>
d.foreach(x => {
if (x.nonEmpty) {
list :+ x
}
})
case _ =>
}
list
}
}
}
Loading

0 comments on commit 6c6bb26

Please sign in to comment.