Skip to content

Commit

Permalink
[KYUUBI #6134] Support Flink 1.19
Browse files Browse the repository at this point in the history
# 🔍 Description
## Issue References 🔗

This pull request fixes #6134

## Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6141 from wForget/KYUUBI-6134.

Closes #6134

c8ee7f0 [wforget] remove flink-1.19.0-rc2 resource and flink 1.19 ga
efb3d8a [wforget] fix
845b27f [wforget] fix
08b8b1d [wforget] dev
c9f6917 [wforget] fix
b93744d [wforget] [KYUUBI #6134] Support Flink 1.19

Authored-by: wforget <643348094@qq.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
wForget authored and pan3793 committed Mar 11, 2024
1 parent 32660d7 commit 78e56af
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ object FlinkEngineUtils extends Logging {

val EMBEDDED_MODE_CLIENT_OPTIONS: Options = getEmbeddedModeClientOptions(new Options)

private def SUPPORTED_FLINK_VERSIONS = Set("1.16", "1.17", "1.18").map(SemanticVersion.apply)
private def SUPPORTED_FLINK_VERSIONS =
Set("1.16", "1.17", "1.18", "1.19").map(SemanticVersion.apply)

val FLINK_RUNTIME_VERSION: SemanticVersion = SemanticVersion(EnvironmentInformation.getVersion)

Expand Down Expand Up @@ -111,7 +112,22 @@ object FlinkEngineUtils extends Logging {
val libDirs: JList[URL] = Option(checkUrls(line, CliOptionsParser.OPTION_LIBRARY))
.getOrElse(JCollections.emptyList())
val dependencies: JList[URL] = discoverDependencies(jars, libDirs)
if (FLINK_RUNTIME_VERSION === "1.16") {
if (FLINK_RUNTIME_VERSION >= "1.19") {
invokeAs[DefaultContext](
classOf[DefaultContext],
"load",
(classOf[Configuration], flinkConf),
(classOf[JList[URL]], dependencies),
(classOf[Boolean], JBoolean.TRUE))
} else if (FLINK_RUNTIME_VERSION >= "1.17") {
invokeAs[DefaultContext](
classOf[DefaultContext],
"load",
(classOf[Configuration], flinkConf),
(classOf[JList[URL]], dependencies),
(classOf[Boolean], JBoolean.TRUE),
(classOf[Boolean], JBoolean.FALSE))
} else if (FLINK_RUNTIME_VERSION === "1.16") {
val commandLines: JList[CustomCommandLine] =
Seq(new GenericCLI(flinkConf, flinkConfDir), new DefaultCLI).asJava
DynConstructors.builder()
Expand All @@ -122,14 +138,6 @@ object FlinkEngineUtils extends Logging {
.build()
.newInstance(flinkConf, commandLines)
.asInstanceOf[DefaultContext]
} else if (FLINK_RUNTIME_VERSION >= "1.17") {
invokeAs[DefaultContext](
classOf[DefaultContext],
"load",
(classOf[Configuration], flinkConf),
(classOf[JList[URL]], dependencies),
(classOf[Boolean], JBoolean.TRUE),
(classOf[Boolean], JBoolean.FALSE))
} else {
throw new KyuubiException(
s"Flink version ${EnvironmentInformation.getVersion} are not supported currently.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,8 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
statement.getConnection.setSchema("db_a")
val changedSchema = statement.getConnection.getSchema
assert(changedSchema == "db_a")
// reset database to default
statement.getConnection.setSchema("default_database")
assert(statement.execute("drop database db_a"))
}
}
Expand Down

0 comments on commit 78e56af

Please sign in to comment.