From d6486abbca0989506c9f6a8563f46a34d0989fd5 Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Fri, 1 Apr 2022 12:31:43 +0200 Subject: [PATCH] Implement binary format support for SQL clear cursor - support binary_format parameter on _sql/close - make cursor close response consistent with the query protocol (ie. in ODBC/JDBC/CLI return CBOR response for cursor close - as for query) --- docs/changelog/84230.yaml | 6 ++ .../sql/jdbc/JdbcHttpClientRequestTests.java | 8 +++ .../jdbc/single_node/JdbcCloseCursorIT.java | 11 ++++ .../sql/qa/jdbc/CloseCursorTestCase.java | 60 +++++++++++++++++++ .../sql/action/SqlClearCursorRequest.java | 22 ++++++- .../action/SqlClearCursorRequestTests.java | 8 ++- .../sql/action/SqlRequestParsersTests.java | 10 +++- .../sql/action/TestSqlClearCursorRequest.java | 3 +- .../xpack/sql/client/HttpClient.java | 2 +- .../xpack/sql/proto/Payloads.java | 16 ++--- .../sql/proto/SqlClearCursorRequest.java | 12 +++- .../sql/plugin/RestSqlClearCursorAction.java | 22 ++++++- 12 files changed, 160 insertions(+), 20 deletions(-) create mode 100644 docs/changelog/84230.yaml create mode 100644 x-pack/plugin/sql/qa/jdbc/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/jdbc/single_node/JdbcCloseCursorIT.java create mode 100644 x-pack/plugin/sql/qa/jdbc/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/CloseCursorTestCase.java diff --git a/docs/changelog/84230.yaml b/docs/changelog/84230.yaml new file mode 100644 index 0000000000000..ded29cebcbe91 --- /dev/null +++ b/docs/changelog/84230.yaml @@ -0,0 +1,6 @@ +pr: 84230 +summary: Implement binary format support for SQL clear cursor +area: SQL +type: bug +issues: + - 53359 diff --git a/x-pack/plugin/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/JdbcHttpClientRequestTests.java b/x-pack/plugin/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/JdbcHttpClientRequestTests.java index af28adfe70c01..85f1b6e84f9fe 100644 --- a/x-pack/plugin/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/JdbcHttpClientRequestTests.java +++ b/x-pack/plugin/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/JdbcHttpClientRequestTests.java @@ -95,6 +95,14 @@ private void assertBinaryRequest(boolean isBinary, XContentType xContentType) th logger.info("Ignored SQLException", e); } assertValues(isBinary, xContentType); + + prepareMockResponse(); + try { + httpClient.queryClose(""); + } catch (SQLException e) { + logger.info("Ignored SQLException", e); + } + assertValues(isBinary, xContentType); } private void assertValues(boolean isBinary, XContentType xContentType) { diff --git a/x-pack/plugin/sql/qa/jdbc/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/jdbc/single_node/JdbcCloseCursorIT.java b/x-pack/plugin/sql/qa/jdbc/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/jdbc/single_node/JdbcCloseCursorIT.java new file mode 100644 index 0000000000000..827bd9550c108 --- /dev/null +++ b/x-pack/plugin/sql/qa/jdbc/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/jdbc/single_node/JdbcCloseCursorIT.java @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.sql.qa.jdbc.single_node; + +import org.elasticsearch.xpack.sql.qa.jdbc.CloseCursorTestCase; + +public class JdbcCloseCursorIT extends CloseCursorTestCase {} diff --git a/x-pack/plugin/sql/qa/jdbc/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/CloseCursorTestCase.java b/x-pack/plugin/sql/qa/jdbc/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/CloseCursorTestCase.java new file mode 100644 index 0000000000000..b108a4ded7b9c --- /dev/null +++ b/x-pack/plugin/sql/qa/jdbc/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/CloseCursorTestCase.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.sql.qa.jdbc; + +import org.elasticsearch.core.CheckedConsumer; +import org.junit.Before; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +public abstract class CloseCursorTestCase extends JdbcIntegrationTestCase { + + @Before + public void initIndex() throws IOException { + index("library", "1", builder -> { builder.field("name", "foo"); }); + index("library", "2", builder -> { builder.field("name", "bar"); }); + index("library", "3", builder -> { builder.field("name", "baz"); }); + } + + public void testCloseCursor() throws SQLException { + doWithQuery("SELECT name FROM library", results -> { + assertTrue(results.next()); + results.close(); // force sending a cursor close since more pages are available + assertTrue(results.isClosed()); + }); + } + + public void testCloseConsumedCursor() throws SQLException { + doWithQuery("SELECT name FROM library", results -> { + for (int i = 0; i < 3; i++) { + assertTrue(results.next()); + } + assertFalse(results.next()); + results.close(); + assertTrue(results.isClosed()); + }); + } + + public void testCloseNoCursor() throws SQLException { + doWithQuery("SELECT name FROM library WHERE name = 'zzz'", results -> { + results.close(); + assertTrue(results.isClosed()); + }); + } + + private void doWithQuery(String query, CheckedConsumer consumer) throws SQLException { + try (Connection connection = createConnection(connectionProperties()); Statement statement = connection.createStatement()) { + statement.setFetchSize(1); + ResultSet results = statement.executeQuery(query); + consumer.accept(results); + } + } +} diff --git a/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlClearCursorRequest.java b/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlClearCursorRequest.java index cf0dcd3e1ff5d..38557b902694e 100644 --- a/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlClearCursorRequest.java +++ b/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlClearCursorRequest.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.sql.proto.Mode; import org.elasticsearch.xpack.sql.proto.RequestInfo; @@ -24,12 +25,16 @@ import static org.elasticsearch.xpack.sql.action.AbstractSqlQueryRequest.CURSOR; import static org.elasticsearch.xpack.sql.action.AbstractSqlQueryRequest.MODE; import static org.elasticsearch.xpack.sql.action.AbstractSqlQueryRequest.VERSION; +import static org.elasticsearch.xpack.sql.proto.CoreProtocol.BINARY_COMMUNICATION; +import static org.elasticsearch.xpack.sql.proto.CoreProtocol.BINARY_FORMAT_NAME; /** * Request to clean all SQL resources associated with the cursor */ public class SqlClearCursorRequest extends AbstractSqlRequest { + static final ParseField BINARY_FORMAT = new ParseField(BINARY_FORMAT_NAME); + private static final ConstructingObjectParser PARSER = // here the position in "objects" is the same as the fields parser declarations below new ConstructingObjectParser<>(SqlClearCursorAction.NAME, objects -> { @@ -43,9 +48,11 @@ public class SqlClearCursorRequest extends AbstractSqlRequest { PARSER.declareString(optionalConstructorArg(), MODE); PARSER.declareString(optionalConstructorArg(), CLIENT_ID); PARSER.declareString(optionalConstructorArg(), VERSION); + PARSER.declareBoolean(SqlClearCursorRequest::binaryCommunication, BINARY_FORMAT); } private String cursor; + private Boolean binaryCommunication = BINARY_COMMUNICATION; public SqlClearCursorRequest() {} @@ -80,12 +87,23 @@ public String getDescription() { public SqlClearCursorRequest(StreamInput in) throws IOException { super(in); cursor = in.readString(); + binaryCommunication = in.readOptionalBoolean(); + } + + public SqlClearCursorRequest binaryCommunication(Boolean binaryCommunication) { + this.binaryCommunication = binaryCommunication; + return this; + } + + public Boolean binaryCommunication() { + return this.binaryCommunication; } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(cursor); + out.writeOptionalBoolean(binaryCommunication); } @Override @@ -94,12 +112,12 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; if (super.equals(o) == false) return false; SqlClearCursorRequest that = (SqlClearCursorRequest) o; - return Objects.equals(cursor, that.cursor); + return Objects.equals(cursor, that.cursor) && Objects.equals(binaryCommunication, that.binaryCommunication); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), cursor); + return Objects.hash(super.hashCode(), cursor, binaryCommunication); } public static SqlClearCursorRequest fromXContent(XContentParser parser) { diff --git a/x-pack/plugin/sql/sql-action/src/test/java/org/elasticsearch/xpack/sql/action/SqlClearCursorRequestTests.java b/x-pack/plugin/sql/sql-action/src/test/java/org/elasticsearch/xpack/sql/action/SqlClearCursorRequestTests.java index ea720e88b1d32..03d6c21297f96 100644 --- a/x-pack/plugin/sql/sql-action/src/test/java/org/elasticsearch/xpack/sql/action/SqlClearCursorRequestTests.java +++ b/x-pack/plugin/sql/sql-action/src/test/java/org/elasticsearch/xpack/sql/action/SqlClearCursorRequestTests.java @@ -36,7 +36,9 @@ protected TestSqlClearCursorRequest createXContextTestInstance(XContentType xCon @Override protected TestSqlClearCursorRequest createTestInstance() { - return new TestSqlClearCursorRequest(requestInfo, randomAlphaOfLength(100)); + TestSqlClearCursorRequest result = new TestSqlClearCursorRequest(requestInfo, randomAlphaOfLength(100)); + result.binaryCommunication(randomBoolean()); + return result; } @Override @@ -58,9 +60,11 @@ protected TestSqlClearCursorRequest mutateInstance(TestSqlClearCursorRequest ins @SuppressWarnings("unchecked") Consumer mutator = randomFrom( request -> request.requestInfo(randomValueOtherThan(request.requestInfo(), this::randomRequestInfo)), - request -> request.setCursor(randomValueOtherThan(request.getCursor(), SqlQueryResponseTests::randomStringCursor)) + request -> request.setCursor(randomValueOtherThan(request.getCursor(), SqlQueryResponseTests::randomStringCursor)), + request -> request.binaryCommunication(randomValueOtherThan(request.binaryCommunication(), () -> randomBoolean())) ); TestSqlClearCursorRequest newRequest = new TestSqlClearCursorRequest(instance.requestInfo(), instance.getCursor()); + newRequest.binaryCommunication(instance.binaryCommunication()); mutator.accept(newRequest); return newRequest; } diff --git a/x-pack/plugin/sql/sql-action/src/test/java/org/elasticsearch/xpack/sql/action/SqlRequestParsersTests.java b/x-pack/plugin/sql/sql-action/src/test/java/org/elasticsearch/xpack/sql/action/SqlRequestParsersTests.java index b12b3a1cfa283..4c05f012699b8 100644 --- a/x-pack/plugin/sql/sql-action/src/test/java/org/elasticsearch/xpack/sql/action/SqlRequestParsersTests.java +++ b/x-pack/plugin/sql/sql-action/src/test/java/org/elasticsearch/xpack/sql/action/SqlRequestParsersTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.sql.proto.CoreProtocol; import org.elasticsearch.xpack.sql.proto.Mode; import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue; @@ -61,23 +62,27 @@ public void testClearCursorRequestParser() throws IOException { "cursor": "whatever", "mode": "%s", "client_id": "bla", - "version": "1.2.3" + "version": "1.2.3", + "binary_format": true }""".formatted(randomMode), SqlClearCursorRequest::fromXContent); assertNull(request.clientId()); assertNull(request.version()); assertEquals(randomMode, request.mode()); assertEquals("whatever", request.getCursor()); + assertTrue(request.binaryCommunication()); randomMode = randomFrom(Mode.values()); request = generateRequest(""" { "cursor": "whatever", "mode": "%s", - "client_id": "bla" + "client_id": "bla", + "binary_format": false }""".formatted(randomMode.toString()), SqlClearCursorRequest::fromXContent); assertNull(request.clientId()); assertEquals(randomMode, request.mode()); assertEquals("whatever", request.getCursor()); + assertFalse(request.binaryCommunication()); request = generateRequest("{\"cursor\" : \"whatever\"}", SqlClearCursorRequest::fromXContent); assertNull(request.clientId()); @@ -94,6 +99,7 @@ public void testClearCursorRequestParser() throws IOException { assertNull(request.version()); assertEquals(Mode.PLAIN, request.mode()); assertEquals("whatever", request.getCursor()); + assertEquals(CoreProtocol.BINARY_COMMUNICATION, request.binaryCommunication()); request = generateRequest(""" {"cursor" : "whatever", "client_id" : "cANVAs"}""", SqlClearCursorRequest::fromXContent); diff --git a/x-pack/plugin/sql/sql-action/src/test/java/org/elasticsearch/xpack/sql/action/TestSqlClearCursorRequest.java b/x-pack/plugin/sql/sql-action/src/test/java/org/elasticsearch/xpack/sql/action/TestSqlClearCursorRequest.java index d4e162f0e2f98..f77edde501d02 100644 --- a/x-pack/plugin/sql/sql-action/src/test/java/org/elasticsearch/xpack/sql/action/TestSqlClearCursorRequest.java +++ b/x-pack/plugin/sql/sql-action/src/test/java/org/elasticsearch/xpack/sql/action/TestSqlClearCursorRequest.java @@ -33,7 +33,8 @@ public TestSqlClearCursorRequest(RequestInfo requestInfo, String cursor) { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { org.elasticsearch.xpack.sql.proto.SqlClearCursorRequest protoInstance = new org.elasticsearch.xpack.sql.proto.SqlClearCursorRequest( this.getCursor(), - this.requestInfo() + this.requestInfo(), + this.binaryCommunication() ); return SqlTestUtils.toXContentBuilder(builder, g -> Payloads.generate(g, protoInstance)); } diff --git a/x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/HttpClient.java b/x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/HttpClient.java index 70702a16dab97..871aabf372a9a 100644 --- a/x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/HttpClient.java +++ b/x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/HttpClient.java @@ -124,7 +124,7 @@ public SqlQueryResponse nextPage(String cursor) throws SQLException { public boolean queryClose(String cursor, Mode mode) throws SQLException { ResponseWithWarnings response = post( CoreProtocol.CLEAR_CURSOR_REST_ENDPOINT, - new SqlClearCursorRequest(cursor, new RequestInfo(mode)), + new SqlClearCursorRequest(cursor, new RequestInfo(mode), cfg.binaryCommunication()), Payloads::parseClearCursorResponse ); return response.response().isSucceeded(); diff --git a/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/Payloads.java b/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/Payloads.java index 959540f148394..e60791bf9a701 100644 --- a/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/Payloads.java +++ b/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/Payloads.java @@ -170,6 +170,7 @@ public static void generate(JsonGenerator generator, SqlClearCursorRequest reque generator.writeStringField("mode", request.mode().toString()); writeIfValid(generator, "client_id", request.clientId()); writeIfValidAsString(generator, "version", request.version()); + writeIfValid(generator, "binary_format", request.binaryCommunication()); generator.writeEndObject(); } @@ -222,20 +223,15 @@ public static void generate( if (request.pageTimeout() != CoreProtocol.PAGE_TIMEOUT) { generator.writeStringField(PAGE_TIMEOUT_NAME, request.pageTimeout().getStringRep()); } - if (request.columnar() != null) { - generator.writeBooleanField(COLUMNAR_NAME, request.columnar()); - } + writeIfValid(generator, COLUMNAR_NAME, request.columnar()); if (request.fieldMultiValueLeniency()) { generator.writeBooleanField(FIELD_MULTI_VALUE_LENIENCY_NAME, request.fieldMultiValueLeniency()); } if (request.indexIncludeFrozen()) { generator.writeBooleanField(INDEX_INCLUDE_FROZEN_NAME, request.indexIncludeFrozen()); } - if (request.binaryCommunication() != null) { - generator.writeBooleanField(BINARY_FORMAT_NAME, request.binaryCommunication()); - } + writeIfValid(generator, BINARY_FORMAT_NAME, request.binaryCommunication()); writeIfValid(generator, CURSOR_NAME, request.cursor()); - writeIfValidAsString(generator, WAIT_FOR_COMPLETION_TIMEOUT_NAME, request.waitForCompletionTimeout(), TimeValue::getStringRep); if (request.keepOnCompletion()) { @@ -255,6 +251,12 @@ private static void writeIfValid(JsonGenerator generator, String name, String va } } + private static void writeIfValid(JsonGenerator generator, String name, Boolean value) throws IOException { + if (value != null) { + generator.writeBooleanField(name, value); + } + } + private static void writeIfValidAsString(JsonGenerator generator, String name, Object value) throws IOException { writeIfValidAsString(generator, name, value, Object::toString); } diff --git a/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/SqlClearCursorRequest.java b/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/SqlClearCursorRequest.java index d94766e0c867b..3b116f3259dd4 100644 --- a/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/SqlClearCursorRequest.java +++ b/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/SqlClearCursorRequest.java @@ -14,27 +14,33 @@ public class SqlClearCursorRequest extends AbstractSqlRequest { private final String cursor; + private final Boolean binaryCommunication; - public SqlClearCursorRequest(String cursor, RequestInfo requestInfo) { + public SqlClearCursorRequest(String cursor, RequestInfo requestInfo, Boolean binaryCommunication) { super(requestInfo); this.cursor = cursor; + this.binaryCommunication = binaryCommunication; } public String getCursor() { return cursor; } + public Boolean binaryCommunication() { + return binaryCommunication; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; if (super.equals(o) == false) return false; SqlClearCursorRequest that = (SqlClearCursorRequest) o; - return Objects.equals(cursor, that.cursor); + return Objects.equals(cursor, that.cursor) && Objects.equals(binaryCommunication, that.binaryCommunication); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), cursor); + return Objects.hash(super.hashCode(), cursor, binaryCommunication); } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlClearCursorAction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlClearCursorAction.java index 5abf2f8a2f01a..e5bfe9072f4c5 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlClearCursorAction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlClearCursorAction.java @@ -10,12 +10,19 @@ import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestResponseListener; +import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.sql.action.Protocol; import org.elasticsearch.xpack.sql.action.SqlClearCursorAction; import org.elasticsearch.xpack.sql.action.SqlClearCursorRequest; +import org.elasticsearch.xpack.sql.action.SqlClearCursorResponse; +import org.elasticsearch.xpack.sql.proto.Mode; import java.io.IOException; import java.util.List; @@ -40,7 +47,18 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli sqlRequest = SqlClearCursorRequest.fromXContent(parser); } - return channel -> client.executeLocally(SqlClearCursorAction.INSTANCE, sqlRequest, new RestToXContentListener<>(channel)); + return channel -> client.executeLocally(SqlClearCursorAction.INSTANCE, sqlRequest, new RestResponseListener<>(channel) { + @Override + public RestResponse buildResponse(SqlClearCursorResponse response) throws Exception { + Boolean binaryRequest = sqlRequest.binaryCommunication(); + XContentType type = Boolean.TRUE.equals(binaryRequest) || (binaryRequest == null && Mode.isDriver(sqlRequest.mode())) + ? XContentType.CBOR + : XContentType.JSON; + XContentBuilder builder = channel.newBuilder(request.getXContentType(), type, false); + response.toXContent(builder, request); + return new BytesRestResponse(RestStatus.OK, builder); + } + }); } @Override