Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ES|QL helpers #762

Merged
merged 2 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package co.elastic.clients.elasticsearch._helpers.esql;

import co.elastic.clients.ApiClient;
import co.elastic.clients.elasticsearch.esql.QueryRequest;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.endpoints.BinaryResponse;

import java.io.IOException;

/**
* A deserializer for ES|QL responses.
*/
public interface EsqlAdapter<Result> {
/**
* ESQL result format this deserializer accepts (text, csv, json, arrow, etc.)
*/
String format();

/**
* For JSON results, whether the result should be organized in rows or columns
*/
boolean columnar();

/**
* Deserialize the raw http response returned by the server
*/
Result deserialize(ApiClient<ElasticsearchTransport, ?> client, QueryRequest request, BinaryResponse response) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package co.elastic.clients.elasticsearch._helpers.esql;

import co.elastic.clients.json.JsonpDeserializer;
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.JsonpMappingException;
import co.elastic.clients.json.JsonpUtils;
import jakarta.json.stream.JsonParser;

import java.util.List;

public abstract class EsqlAdapterBase<T> implements EsqlAdapter<T> {

/**
* Reads the header of an ES|QL response, moving the parser at the beginning of the first value row.
* The caller can then read row arrays until finding an end array that closes the top-level array.
*/
public static EsqlMetadata readHeader(JsonParser parser, JsonpMapper mapper) {
JsonpUtils.expectNextEvent(parser, JsonParser.Event.START_OBJECT);
JsonpUtils.expectNextEvent(parser, JsonParser.Event.KEY_NAME);

if (!"columns".equals(parser.getString())) {
throw new JsonpMappingException("Expecting a 'columns' property, but found '" + parser.getString() + "'", parser.getLocation());
}

List<EsqlMetadata.EsqlColumn> columns = JsonpDeserializer
.arrayDeserializer(EsqlMetadata.EsqlColumn._DESERIALIZER)
.deserialize(parser, mapper);

EsqlMetadata result = new EsqlMetadata();
result.columns = columns;

JsonpUtils.expectNextEvent(parser, JsonParser.Event.KEY_NAME);

if (!"values".equals(parser.getString())) {
throw new JsonpMappingException("Expecting a 'values' property, but found '" + parser.getString() + "'", parser.getLocation());
}

JsonpUtils.expectNextEvent(parser, JsonParser.Event.START_ARRAY);

return result;
}

/**
* Checks the footer of an ES|QL response, once the values have been read.
*/
public static void readFooter(JsonParser parser) {
JsonpUtils.expectNextEvent(parser, JsonParser.Event.END_OBJECT);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package co.elastic.clients.elasticsearch._helpers.esql;

import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch.esql.ElasticsearchEsqlAsyncClient;
import co.elastic.clients.elasticsearch.esql.ElasticsearchEsqlClient;
import co.elastic.clients.elasticsearch.esql.QueryRequest;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.transport.endpoints.BinaryResponse;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class EsqlHelper {

//----- Synchronous

public static <T> T query(
ElasticsearchEsqlClient client, EsqlAdapter<T> adapter, String query, Object... params
) throws IOException {
QueryRequest request = buildRequest(adapter, query, params);
BinaryResponse response = client.query(request);
return adapter.deserialize(client, request, response);
}

public static <T> T query(ElasticsearchEsqlClient client, EsqlAdapter<T> adapter, QueryRequest request) throws IOException {
request = buildRequest(adapter, request);
BinaryResponse response = client.query(request);
return adapter.deserialize(client, request, response);
}

//----- Asynchronous

public static <T> CompletableFuture<T> queryAsync(
ElasticsearchEsqlAsyncClient client, EsqlAdapter<T> adapter, String query, Object... params
) {
return doQueryAsync(client, adapter, buildRequest(adapter, query, params));
}

public static <T> CompletableFuture<T> queryAsync(
ElasticsearchEsqlAsyncClient client, EsqlAdapter<T> adapter, QueryRequest request
) {
return doQueryAsync(client, adapter, buildRequest(adapter, request));
}

private static <T> CompletableFuture<T> doQueryAsync(
ElasticsearchEsqlAsyncClient client, EsqlAdapter<T> adapter, QueryRequest request
) {
return client
.query(request)
.thenApply(r -> {
try {
return adapter.deserialize(client, request, r);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

//----- Utilities

private static QueryRequest buildRequest(EsqlAdapter<?> adapter, String query, Object... params) {
return QueryRequest.of(esql -> esql
.format(adapter.format())
.columnar(adapter.columnar())
.query(query)
.params(asFieldValues(params))
);
}

private static QueryRequest buildRequest(EsqlAdapter<?> adapter, QueryRequest request) {
return QueryRequest.of(q -> q
// Set/override format and columnar
.format(adapter.format())
.columnar(adapter.columnar())

.delimiter(request.delimiter())
.filter(request.filter())
.locale(request.locale())
.params(request.params())
.query(request.query())
);
}

private static List<FieldValue> asFieldValues(Object... objects) {

List<FieldValue> result = new ArrayList<>(objects.length);
for (Object object: objects) {
result.add(FieldValue.of(JsonData.of(object)));
}

return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package co.elastic.clients.elasticsearch._helpers.esql;

import co.elastic.clients.json.JsonpDeserializable;
import co.elastic.clients.json.JsonpDeserializer;
import co.elastic.clients.json.ObjectBuilderDeserializer;
import co.elastic.clients.json.ObjectDeserializer;
import co.elastic.clients.util.ObjectBuilder;
import co.elastic.clients.util.ObjectBuilderBase;

import java.util.List;

public class EsqlMetadata {

@JsonpDeserializable
public static class EsqlColumn {
private String name;
private String type;

public String name() {
return name;
}

public String type() {
return type;
}

public static class Builder extends ObjectBuilderBase implements ObjectBuilder<EsqlColumn> {
EsqlColumn object = new EsqlColumn();

public Builder name(String value) {
object.name = value;
return this;
}

public Builder type(String value) {
object.type = value;
return this;
}

@Override
public EsqlColumn build() {
_checkSingleUse();
return object;
}
}

public static final JsonpDeserializer<EsqlColumn> _DESERIALIZER = ObjectBuilderDeserializer.lazy(
EsqlColumn.Builder::new, EsqlColumn::setupEsqlColumnDeserializer
);

protected static void setupEsqlColumnDeserializer(ObjectDeserializer<EsqlColumn.Builder> op) {
op.add(EsqlColumn.Builder::name, JsonpDeserializer.stringDeserializer(), "name");
op.add(EsqlColumn.Builder::type, JsonpDeserializer.stringDeserializer(), "type");
}
}

public List<EsqlColumn> columns;
}
Loading
Loading