Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG][FLINK] Skip the max-rows limitation if the statement is not DQL #5814

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ import org.apache.flink.table.types.DataType
import org.apache.flink.types.Row

import org.apache.kyuubi.Logging
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.engine.flink.shim.FlinkResultSet
import org.apache.kyuubi.operation.FetchIterator
import org.apache.kyuubi.util.reflect.DynFields

class QueryResultFetchIterator(
class IncrementalResultFetchIterator(
resultFetcher: ResultFetcher,
maxRows: Int = 1000000,
resultFetchTimeout: Duration = Duration.Inf) extends FetchIterator[Row] with Logging {
Expand All @@ -58,8 +60,17 @@ class QueryResultFetchIterator(

val FETCH_INTERVAL_MS: Long = 1000

// for Flink 1.16 and below, isQueryResult is not supported
val isQueryResult: Boolean =
FlinkEngineUtils.FLINK_RUNTIME_VERSION < "1.17" ||
DynFields.builder
.hiddenImpl(classOf[ResultFetcher], "isQueryResult")
.build[Boolean](resultFetcher).get()

val effectiveMaxRows: Int = if (isQueryResult) maxRows else Int.MaxValue

private val executor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("flink-query-iterator-%d").setDaemon(true).build)
new ThreadFactoryBuilder().setNameFormat("flink-result-iterator-%d").setDaemon(true).build)

implicit private val executionContext: ExecutionContextExecutor =
ExecutionContext.fromExecutor(executor)
Expand All @@ -78,7 +89,7 @@ class QueryResultFetchIterator(
// if no timeout is set, this would block until some rows are fetched
debug(s"Fetching from result store with timeout $resultFetchTimeout ms")
while (!fetched && !Thread.interrupted()) {
val rs = resultFetcher.fetchResults(token, maxRows - bufferedRows.length)
val rs = resultFetcher.fetchResults(token, effectiveMaxRows - bufferedRows.length)
val flinkRs = new FlinkResultSet(rs)
// TODO: replace string-based match when Flink 1.16 support is dropped
flinkRs.getResultType.name() match {
Expand Down Expand Up @@ -144,7 +155,7 @@ class QueryResultFetchIterator(
debug(s"Fetching from buffered rows at pos $pos.")
val row = bufferedRows(pos.toInt)
pos += 1
if (pos >= maxRows) {
if (pos >= effectiveMaxRows) {
hasNext = false
}
row
Expand All @@ -154,7 +165,7 @@ class QueryResultFetchIterator(
if (hasNext) {
val row = bufferedRows(pos.toInt)
pos += 1
if (pos >= maxRows) {
if (pos >= effectiveMaxRows) {
hasNext = false
}
row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ case class ResultSet(

def close: Unit = {
data match {
case queryIte: QueryResultFetchIterator => queryIte.close()
case incIte: IncrementalResultFetchIterator => incIte.close()
case _ =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object ResultSetUtil {
throw new IllegalArgumentException("maxRows should be positive")
}
val schema = resultFetcher.getResultSchema
val ite = new QueryResultFetchIterator(resultFetcher, maxRows, resultFetchTimeout)
val ite = new IncrementalResultFetchIterator(resultFetcher, maxRows, resultFetchTimeout)
ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(schema.getColumns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,22 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
assert(rows === 200)
}
}
if (FLINK_RUNTIME_VERSION >= "1.17") {
withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "10"))(Map.empty) {
withJdbcStatement() { statement =>
for (i <- 0 to 10) {
statement.execute(s"create table tbl_src$i (a bigint) " +
s"with ('connector' = 'blackhole')")
}
val resultSet = statement.executeQuery("show tables")
var rows = 0
while (resultSet.next()) {
rows += 1
}
assert(rows === 11)
}
}
}
}

test("execute statement - add/show jar") {
Expand Down
Loading