Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wForget committed Mar 8, 2024
1 parent 08b8b1d commit 845b27f
Showing 1 changed file with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,22 @@ object FlinkEngineUtils extends Logging {
val libDirs: JList[URL] = Option(checkUrls(line, CliOptionsParser.OPTION_LIBRARY))
.getOrElse(JCollections.emptyList())
val dependencies: JList[URL] = discoverDependencies(jars, libDirs)
if (FLINK_RUNTIME_VERSION === "1.16") {
if (FLINK_RUNTIME_VERSION >= "1.19") {
invokeAs[DefaultContext](
classOf[DefaultContext],
"load",
(classOf[Configuration], flinkConf),
(classOf[JList[URL]], dependencies),
(classOf[Boolean], JBoolean.TRUE))
} else if (FLINK_RUNTIME_VERSION >= "1.17") {
invokeAs[DefaultContext](
classOf[DefaultContext],
"load",
(classOf[Configuration], flinkConf),
(classOf[JList[URL]], dependencies),
(classOf[Boolean], JBoolean.TRUE),
(classOf[Boolean], JBoolean.FALSE))
} else if (FLINK_RUNTIME_VERSION === "1.16") {
val commandLines: JList[CustomCommandLine] =
Seq(new GenericCLI(flinkConf, flinkConfDir), new DefaultCLI).asJava
DynConstructors.builder()
Expand All @@ -120,14 +135,6 @@ object FlinkEngineUtils extends Logging {
.build()
.newInstance(flinkConf, commandLines)
.asInstanceOf[DefaultContext]
} else if (FLINK_RUNTIME_VERSION >= "1.17") {
invokeAs[DefaultContext](
classOf[DefaultContext],
"load",
(classOf[Configuration], flinkConf),
(classOf[JList[URL]], dependencies),
(classOf[Boolean], JBoolean.TRUE),
(classOf[Boolean], JBoolean.FALSE))
} else {
throw new KyuubiException(
s"Flink version ${EnvironmentInformation.getVersion} are not supported currently.")
Expand Down

0 comments on commit 845b27f

Please sign in to comment.