-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream the result from Search Backend instead of use List. #481
Stream the result from Search Backend instead of use List. #481
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! I'm looking forward to reviewing in more detail. A couple general questions up front.
- It looks like the scroll api is always used now? Or can standard search api still be used with configuration update or based on situation (e.g. result count)? It seems that there are use cases to prefer search over scroll api. In particular Elasticsearch scroll documentation says "scrolling is not intended for real time user requests". Also would users of some raw index query features (see Actually process raw query parameters. Issue 338 #423) require the search instead of scroll api?
- Depending on the answer to the above, are there tests that hit both scroll/non-scroll branches?
- Why was max-result-size configuration item removed? I think having an overall max-result-size configuration is still valuable, regardless of whether results are being paginated or scrolled, or is this enforced somewhere else now?
@@ -1256,7 +1258,7 @@ else if (query.getResultType() == ElementCategory.VERTEX) { | |||
Iterator<JanusGraphElement> iter; | |||
if (!indexQuery.isEmpty()) { | |||
List<QueryUtil.IndexCall<Object>> retrievals = new ArrayList<QueryUtil.IndexCall<Object>>(); | |||
for (int i = 0; i < indexQuery.size(); i++) { | |||
for (int i = 1; i < indexQuery.size(); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does the start index change from 0
to 1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I use the first query to initialize SubqueryIterator on line 1280
iter = new SubqueryIterator(indexQuery.getQuery(0), indexSerializer, txHandle, indexCache, indexQuery.getLimit(), getConversionFunction(query.getResultType()),
retrievals.isEmpty() ? null: QueryUtil.processIntersectingRetrievals(retrievals, indexQuery.getLimit()));
b6d35e4
to
b2da51c
Compare
It looks like the scroll api is always used now? Or can standard search api still be used with configuration update or based on situation (e.g. result count)? It seems that there are use cases to prefer search over scroll api. In particular Elasticsearch scroll documentation says "scrolling is not intended for real time user requests". Also would users of some raw index query features (see #423) require the search instead of scroll api? Depending on the answer to the above, are there tests that hit both scroll/non-scroll branches? Why was max-result-size configuration item removed? I think having an overall max-result-size configuration is still valuable, regardless of whether results are being paginated or scrolled, or is this enforced somewhere else now?
For Solr, I'm not sure because I can not use cursor. I may need to restore max-result-size for Solr ? |
Thanks for your responses, they all make good sense. I like the change to use the search api if result is less than Regarding deleting the scroll context, it looks like you're just clearing after competing iteration over the given batch? Do you handle cases where iteration does not complete (e.g. client code stops iterating early, exceptions occur during processing, etc.), in terms of clearing the last scroll? If not I'm wondering about implications for the case where the same graph stays open during the (potentially long) lifetime of an application, where these uncleared scrolls might stack up. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some feedback from first pass through.
IndexType index = query.getIndex(); | ||
if (index.isCompositeIndex()) { | ||
MultiKeySliceQuery sq = query.getCompositeQuery(); | ||
List<EntryList> rs = sq.execute(tx); | ||
List<Object> results = new ArrayList<Object>(rs.get(0).size()); | ||
List<Object> results = new ArrayList<>(rs.get(0).size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
here and throughout PR
final QueryResponse response = runCommonQuery(query, informations, tx, collection, keyIdField); | ||
logger.debug("Executed query [{}] in {} ms", query.getQuery(), response.getElapsedTime()); | ||
return response.getResults().getNumFound(); | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix indentation
|
||
@Override | ||
public boolean hasNext() { | ||
if(cpt!=0 && cpt % nbDocByQuery == nbDocByQuery/2 && nbDocByQuery * nbRequest < limit){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Can you add tests to hit any missing cases here (e.g. actual_count<nbDocByQuery, actual_count==nbDocByQuery, actual_count>nbDocByQuery, limit==actual_count,limit%nbDocByQuery==0, etc.)?
- Why is the middle condition not
cpt % nbDocByQuery == 0
? - Add whitespace
if (...) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
It's not already tested ? I can add tests.
-
At the begin of the PR, i would like to delegate the search to a stream (so I asked to the stream when i consumed half of the previous bulk), but I have some issue so I split the PR in two.
this.collection = collection; | ||
this.solrQuery = solrQuery; | ||
this.function = function; | ||
long nbFound = solrClient.queryAndStreamResponse(collection, solrQuery, new SolrCallbackHandler(this, function)).getResults().getNumFound() - offset; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IO/SolrExceptions here are propagated to SolrIndex, which wraps them as a PermanentBackendException, whereas below (hasNext/next) RuntimeExceptions are being thrown directly to the client (e.g. uncaught in SolrIndex). I think the behavior should be consistent with top-level exception as PermanentBackendException. I wouldn't catch generic RuntimeException in SolrIndex, but maybe propagate and catch a more specific/custom implementation below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm in a stream inside a map or an iterator.
So I have to throw RuntimeException and It's have to be catch when the stream is collected ?
So I just create a specific RuntimeException
solrClient.queryAndStreamResponse(collection, solrQuery, new SolrCallbackHandler(this, function)); | ||
nbRequest++; | ||
} catch (SolrServerException | IOException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emit something more specific and then catch and wrap as PermanentBackendException in SolrIndex?
.setRows(query.hasLimit() ? query.getLimit() : maxResults); | ||
.setStart(query.getOffset()); | ||
if (query.hasLimit()) solrQuery.setRows(Math.min(query.getLimit(), nbResultByQuery)); | ||
else solrQuery.setRows(nbResultByQuery); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Newlines and curly brackets
if (query.hasLimit()) sr.setSize(query.getLimit()); | ||
else sr.setSize(maxResultsSize); | ||
if (query.hasLimit()) sr.setSize(Math.min(query.getLimit(), nbResultByQuery)); | ||
else sr.setSize(nbResultByQuery); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Newlines and curly braces
/** | ||
* @author David Clement (david.clement90@laposte.net) | ||
*/ | ||
public class ElasticSearchScroll implements Iterator<RawQuery.Result<String>>{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
White space before curly braces. Please fix here and throughout PR.
if (isFinished) client.deleteScroll(scrollId); | ||
return res.nbResults() > 0; | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consistent exception handling (see comments in SolrResultIterator)
try { | ||
return queue.take(); | ||
} catch (InterruptedException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consistent exception handling (see comments in SolrResultIterator)
Thanks for your responses, they all make good sense. I like the change to use the search api if result is less than nbResultByQuery. Rather than adding the new parameter what about keeping max-result-size and just updating documentation to indicate that for index backends that support scrolling it will represent the number of results in each batch? Regarding deleting the scroll context, it looks like you're just clearing after competing iteration over the given batch? Do you handle cases where iteration does not complete (e.g. client code stops iterating early, exceptions occur during processing, etc.), in terms of clearing the last scroll? If not I'm wondering about implications for the case where the same graph stays open during the (potentially long) lifetime of an application, where these uncleared scrolls might stack up. |
7ecd323
to
7bacf49
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more feedback is below. I think limiting the custom RuntimeException just to the Solr module would be preferable to PermanentBackendRuntimeException in core. In Elasticsearch and Lucene the IOExceptions can be wrapped in the existing UncheckedIOException. It's also important to catch whatever is thrown in ElasticsearchIndex and SolrIndex so the final exception to the client is consistent (e.g. PermanentBackendException).
I still think adding the scroll implementation around the nbResultsByQuery (or batchSize) does need tests that probably wouldn't have existed in the existing single-batch implementation. Cases to cover if they're not already: actual_count<batchSize
, actual_count==batchSize
, actual_count>batchSize
, limit==actual_count
, limit%batchSize==0
, etc..
this("Permanent failure in storage backend", cause); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete extra newline
@@ -79,11 +80,11 @@ | |||
* @param query Query to execute | |||
* @param informations Information on the keys used in the query accessible through {@link KeyInformation.IndexRetriever}. | |||
* @param tx Enclosing transaction | |||
* @return The ids of all matching documents | |||
* @return An iterator of all matching documents ids |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could keep language as is "The ids of all matching documents" but if you want to change it looks like it should be stream, not iterator.
@@ -92,11 +93,11 @@ | |||
* @param query Query to execute | |||
* @param informations Information on the keys used in the query accessible through {@link KeyInformation.IndexRetriever}. | |||
* @param tx Enclosing transaction | |||
* @return Results objects for all matching documents (i.e. document id and score) | |||
* @return Iterator on objects for all matching documents (i.e. document id and score) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto ... I think you could keep language as is but if you want to change it looks like it should be stream, not iterator.
@@ -892,8 +892,9 @@ public boolean apply(@Nullable Integer uniqueIdBitWidth) { | |||
ConfigOption.Type.MASKABLE, String.class); | |||
|
|||
public static final ConfigOption<Integer> INDEX_MAX_RESULT_SET_SIZE = new ConfigOption<Integer>(INDEX_NS, "max-result-set-size", | |||
"Maxium number of results to return if no limit is specified", | |||
ConfigOption.Type.MASKABLE, 100000); | |||
"Maxium number of results to return if no limit is specified. For index backends that support scrolling, it represent " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/represent/represents/
@@ -218,7 +223,7 @@ public static Mode parse(String mode) { | |||
private final boolean dynFields; | |||
private final Map<String, String> keyFieldIds; | |||
private final String ttlField; | |||
private final int maxResults; | |||
private final int nbResultByQuery; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about "batchSize" instead of "nbResultByQuery"?
@@ -110,4 +111,11 @@ public void setResultSize(long size) { | |||
return result; | |||
} | |||
|
|||
public static QueryProfiler startProfile(QueryProfiler profiler, Subquery query) { | |||
QueryProfiler sub = profiler.addNested("backend-query"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
*/ | ||
public class StreamIterable<E> implements Iterable<E>{ | ||
|
||
private Stream<E> stream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
|
||
private final Cache<JointIndexQuery.Subquery, List<Object>> indexCache; | ||
|
||
private Iterator<? extends JanusGraphElement> itElement; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/itElement/elementIterator/
Function<Object, ? extends JanusGraphElement> function, List<Object> otherResults) { | ||
this.subQuery = subQuery; | ||
this.indexCache = indexCache; | ||
List<Object> cacheResponse; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
here and throughout PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is not final because cacheResponse is set on line 59 but for all the others and even more I update my eclipse save actions.
We need to work on #242...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be final. It's just declared here and only initialized once (on L59).
@@ -187,7 +194,7 @@ | |||
private final AbstractESCompat compat; | |||
private final ElasticSearchClient client; | |||
private final String indexName; | |||
private final int maxResultsSize; | |||
private final int nbResultByQuery; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/nbResultsByQuery/batchSize/
7bacf49
to
43755cf
Compare
I only add test for Solr implementation of Scroll, because ES one, it's base on Elasticsearch mechanism (respond empty when the scroll is over) and Lucene implementation does not change anymore. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good. Some more feedback on latest updates.
* @author davidclement@laposte.net | ||
* | ||
*/ | ||
public class UncheckedIOException extends RuntimeException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need this class as there's already a java.io.UncheckedIOException .
@@ -91,7 +91,9 @@ public Connection connect(Configuration config) throws IOException { | |||
} | |||
RestClient rc = RestClient.builder(hosts.toArray(new HttpHost[hosts.size()])).build(); | |||
|
|||
RestElasticSearchClient client = new RestElasticSearchClient(rc); | |||
int scrollKeepAlive = config.get(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
RestElasticSearchClient client = new RestElasticSearchClient(rc); | ||
int scrollKeepAlive = config.get(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE); | ||
Preconditions.checkArgument(scrollKeepAlive >= 1, "Scroll Keep alive should be greater or equals than 1"); | ||
RestElasticSearchClient client = new RestElasticSearchClient(rc, scrollKeepAlive); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
@@ -274,6 +279,26 @@ public RestSearchResponse search(String indexName, String type, Map<String,Objec | |||
} | |||
} | |||
|
|||
@Override | |||
public RestSearchResponse search(String scrollId) throws IOException { | |||
Map<String, Object> request = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
@@ -151,4 +157,38 @@ public void testSupport() { | |||
assertTrue(index.supports(of(UUID.class, Cardinality.SINGLE), Cmp.NOT_EQUAL)); | |||
} | |||
|
|||
@Test | |||
public void testScroll() throws BackendException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test looks good (thanks for adding it). Can you elevate it to IndexProviderTest? I'd like to make sure we're covering all branches in ElasticSearchScroll as well. Though scrolling is not used in all implementations (e.g. Lucene) the test should still pass. If you want to exclude it from running under Lucene you could add an abstract boolean supportsBatching()
or something to IndexProviderTest (or else add scrolling as an index feature).
Function<Object, ? extends JanusGraphElement> function, List<Object> otherResults) { | ||
this.subQuery = subQuery; | ||
this.indexCache = indexCache; | ||
List<Object> cacheResponse; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be final. It's just declared here and only initialized once (on L59).
this.subQuery = subQuery; | ||
this.indexCache = indexCache; | ||
List<Object> cacheResponse; | ||
Stream<?> stream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also can be final
|
||
ElasticSearchResponse response; | ||
try { | ||
response = client.search(getIndexStoreName(query.getStore()), useMultitypeIndex ? query.getStore() : null, compat.createRequestBody(sr, NULL_PARAMETERS)); | ||
} catch (IOException e) { | ||
response = client.search(getIndexStoreName(query.getStore()), useMultitypeIndex ? query.getStore() : null, compat.createRequestBody(sr, NULL_PARAMETERS), sr.getSize() >= batchSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Define index and type on separate lines to get this line length a little more manageable.
/** | ||
* @param msg Exception message | ||
*/ | ||
public UncheckedSolrException(String msg) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to public UncheckedSolrException(SolrServerException cause)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I keep this one but I change public UncheckedSolrException(Throwable cause) to public UncheckedSolrException(SolrServerException cause)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm looking at java.io.UncheckedIOException as a model here. Requiring SolrServerException in all constructors protects against the misuse of the class for something other than its intended purpose (e.g. to wrap a SolrServerException as a RuntimeException). It also provides an API guarantee for future clients that the cause of an UncheckedSolrException is always a SolrServerException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I just make the change
* @param msg Exception message | ||
* @param cause Cause of the exception | ||
*/ | ||
public UncheckedSolrException(String msg, Throwable cause) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to public UncheckedSolrException(String message, SolrServerException cause)
(all constructors should take SolrServerException
).
e3dbd9a
to
e39fa64
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the updates. There are some new test failures under Elasticsearch 1.x. Is there an easy fix? Otherwise I think we'd have to (finally/again) remove Elasticsearch 1.x from the compatibility matrix.
sed -i 's/\${es.docker.image}:\${elasticsearch.test.version}/\${es.docker.image}:1.7.6/' janusgraph-es/pom.xml
mvn clean install -pl janusgraph-es -Pes-docker -Delasticsearch.test.groovy.inline=true -Des.docker.image=elasticsearch
...
Tests run: 21, Failures: 0, Errors: 4, Skipped: 0, Time elapsed: 16.006 sec <<< FAILURE! - in org.janusgraph.diskstorage.es.ElasticSearchMutliTypeIndexTest
singleStore(org.janusgraph.diskstorage.es.ElasticSearchMutliTypeIndexTest) Time elapsed: 0.89 sec <<< ERROR!
java.io.UncheckedIOException: POST http://127.0.0.1:9200_search/scroll: HTTP/1.1 400 Bad Request
{"error":"ElasticsearchIllegalArgumentException[Failed to decode scrollId]; nested: IOException[Bad Base64 input character decimal 123 in array position 0]; ","status":400}
...
e39fa64
to
8e198fe
Compare
I find an easy fix. There have been breaking change in scroll API between ES 1 and ES 2. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for finding the fix. Great contribution.
RestElasticSearchClient client = new RestElasticSearchClient(rc); | ||
final int scrollKeepAlive = config.get(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE); | ||
Preconditions.checkArgument(scrollKeepAlive >= 1, "Scroll Keep alive should be greater or equals than 1"); | ||
RestElasticSearchClient client = new RestElasticSearchClient(rc, scrollKeepAlive); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
requestData = scrollId.getBytes(); | ||
} else { | ||
path = "_search/scroll"; | ||
Map<String, Object> request = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
8e198fe
to
d67428f
Compare
Signed-off-by: David Clement <david.clement90@laposte.net>
Unless anyone wants to add their review or request more time to review I'll merge this in 48 hours. |
…endResult Stream the result from Search Backend instead of use List.
…endResult Stream the result from Search Backend instead of use List.
Issue : #480
I can not use cursors for Solr because sort clause must include the unique key field
(https://cwiki.apache.org/confluence/display/solr/Pagination+of+Results)
Signed-off-by: David Clement david.clement90@laposte.net