Skip to content

Commit 2d8b9c1

Browse files
committed
[FLINK] Skip the max-rows limitation if the statement is not DQL
1 parent 25eb53d commit 2d8b9c1

File tree

4 files changed

+25
-6
lines changed

4 files changed

+25
-6
lines changed
Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ import org.apache.flink.types.Row
3636
import org.apache.kyuubi.Logging
3737
import org.apache.kyuubi.engine.flink.shim.FlinkResultSet
3838
import org.apache.kyuubi.operation.FetchIterator
39+
import org.apache.kyuubi.util.reflect.ReflectUtils.getField
3940

40-
class QueryResultFetchIterator(
41+
class IncrementalResultFetchIterator(
4142
resultFetcher: ResultFetcher,
4243
maxRows: Int = 1000000,
4344
resultFetchTimeout: Duration = Duration.Inf) extends FetchIterator[Row] with Logging {
@@ -58,6 +59,10 @@ class QueryResultFetchIterator(
5859

5960
val FETCH_INTERVAL_MS: Long = 1000
6061

62+
val isQueryResult: Boolean = getField(resultFetcher, "isQueryResult")
63+
64+
val effectiveMaxRows: Int = if (isQueryResult) maxRows else Int.MaxValue
65+
6166
private val executor = Executors.newSingleThreadScheduledExecutor(
6267
new ThreadFactoryBuilder().setNameFormat("flink-query-iterator-%d").setDaemon(true).build)
6368

@@ -78,7 +83,7 @@ class QueryResultFetchIterator(
7883
// if no timeout is set, this would block until some rows are fetched
7984
debug(s"Fetching from result store with timeout $resultFetchTimeout ms")
8085
while (!fetched && !Thread.interrupted()) {
81-
val rs = resultFetcher.fetchResults(token, maxRows - bufferedRows.length)
86+
val rs = resultFetcher.fetchResults(token, effectiveMaxRows - bufferedRows.length)
8287
val flinkRs = new FlinkResultSet(rs)
8388
// TODO: replace string-based match when Flink 1.16 support is dropped
8489
flinkRs.getResultType.name() match {
@@ -144,7 +149,7 @@ class QueryResultFetchIterator(
144149
debug(s"Fetching from buffered rows at pos $pos.")
145150
val row = bufferedRows(pos.toInt)
146151
pos += 1
147-
if (pos >= maxRows) {
152+
if (pos >= effectiveMaxRows) {
148153
hasNext = false
149154
}
150155
row
@@ -154,7 +159,7 @@ class QueryResultFetchIterator(
154159
if (hasNext) {
155160
val row = bufferedRows(pos.toInt)
156161
pos += 1
157-
if (pos >= maxRows) {
162+
if (pos >= effectiveMaxRows) {
158163
hasNext = false
159164
}
160165
row

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ case class ResultSet(
5353

5454
def close: Unit = {
5555
data match {
56-
case queryIte: QueryResultFetchIterator => queryIte.close()
56+
case incIte: IncrementalResultFetchIterator => incIte.close()
5757
case _ =>
5858
}
5959
}

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ object ResultSetUtil {
7373
throw new IllegalArgumentException("maxRows should be positive")
7474
}
7575
val schema = resultFetcher.getResultSchema
76-
val ite = new QueryResultFetchIterator(resultFetcher, maxRows, resultFetchTimeout)
76+
val ite = new IncrementalResultFetchIterator(resultFetcher, maxRows, resultFetchTimeout)
7777
ResultSet.builder
7878
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
7979
.columns(schema.getColumns)

externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1148,6 +1148,20 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
11481148
assert(rows === 200)
11491149
}
11501150
}
1151+
withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "10"))(Map.empty) {
1152+
withJdbcStatement() { statement =>
1153+
for (i <- 0 to 10) {
1154+
statement.execute(s"create table tbl_src$i (a bigint) " +
1155+
s"with ('connector' = 'blackhole')")
1156+
}
1157+
val resultSet = statement.executeQuery("show tables")
1158+
var rows = 0
1159+
while (resultSet.next()) {
1160+
rows += 1
1161+
}
1162+
assert(rows === 11)
1163+
}
1164+
}
11511165
}
11521166

11531167
test("execute statement - add/show jar") {

0 commit comments

Comments
 (0)