Skip to content

Commit

Permalink
Stream the result from Search Backend instead of use List.
Browse files Browse the repository at this point in the history
Signed-off-by: David Clement <david.clement90@laposte.net>
  • Loading branch information
davidclement90 committed Sep 4, 2017
1 parent b5f0e2a commit 43755cf
Show file tree
Hide file tree
Showing 32 changed files with 961 additions and 400 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.janusgraph.core;

import org.janusgraph.core.schema.Parameter;
import java.util.stream.Stream;
import org.apache.tinkerpop.gremlin.structure.Element;

/**
Expand Down Expand Up @@ -82,24 +83,54 @@ public interface JanusGraphIndexQuery {
/**
* Returns all vertices that match the query in the indexing backend.
*
* @deprecated use {@link #vertexStream()} instead.
*
* @return
*/
@Deprecated
public Iterable<Result<JanusGraphVertex>> vertices();

/**
* Returns all vertices that match the query in the indexing backend.
*
* @return
*/
public Stream<Result<JanusGraphVertex>> vertexStream();

/**
* Returns all edges that match the query in the indexing backend.
*
* @deprecated use {@link #edgeStream()} instead.
*
* @return
*/
@Deprecated
public Iterable<Result<JanusGraphEdge>> edges();

/**
* Returns all edges that match the query in the indexing backend.
*
* @return
*/
public Stream<Result<JanusGraphEdge>> edgeStream();

/**
* Returns all properties that match the query in the indexing backend.
*
* @deprecated use {@link #propertyStream()} instead.
*
* @return
*/
@Deprecated
public Iterable<Result<JanusGraphVertexProperty>> properties();

/**
* Returns all properties that match the query in the indexing backend.
*
* @return
*/
public Stream<Result<JanusGraphVertexProperty>> propertyStream();

/**
* Returns total vertices that match the query in the indexing backend ignoring limit and offset.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import org.janusgraph.diskstorage.keycolumnvalue.cache.KCVSCache;
import org.janusgraph.diskstorage.log.kcvs.ExternalCachePersistor;
Expand Down Expand Up @@ -408,12 +409,12 @@ public String toString() {
}


public List<String> indexQuery(final String index, final IndexQuery query) {
public Stream<String> indexQuery(final String index, final IndexQuery query) {
final IndexTransaction indexTx = getIndexTransaction(index);
return executeRead(new Callable<List<String>>() {
return executeRead(new Callable<Stream<String>>() {
@Override
public List<String> call() throws Exception {
return indexTx.query(query);
public Stream<String> call() throws Exception {
return indexTx.queryStream(query);
}

@Override
Expand All @@ -423,12 +424,12 @@ public String toString() {
});
}

public Iterable<RawQuery.Result<String>> rawQuery(final String index, final RawQuery query) {
public Stream<RawQuery.Result<String>> rawQuery(final String index, final RawQuery query) {
final IndexTransaction indexTx = getIndexTransaction(index);
return executeRead(new Callable<Iterable<RawQuery.Result<String>>>() {
return executeRead(new Callable<Stream<RawQuery.Result<String>>>() {
@Override
public Iterable<RawQuery.Result<String>> call() throws Exception {
return indexTx.query(query);
public Stream<RawQuery.Result<String>> call() throws Exception {
return indexTx.queryStream(query);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2017 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.diskstorage;

/**
*
* @author davidclement@laposte.net
*
*/
public class UncheckedIOException extends RuntimeException {

private static final long serialVersionUID = -4416688826691426497L;

/**
* @param msg Exception message
*/
public UncheckedIOException(String msg) {
super(msg);
}

/**
* @param msg Exception message
* @param cause Cause of the exception
*/
public UncheckedIOException(String msg, Throwable cause) {
super(msg, cause);
}

/**
* Constructs an exception with a generic message
*
* @param cause Cause of the exception
*/
public UncheckedIOException(Throwable cause) {
this("Unckecked IO failure in storage backend", cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

/**
* External index for querying.
Expand Down Expand Up @@ -83,7 +84,7 @@ public interface IndexProvider extends IndexInformation {
* @throws org.janusgraph.diskstorage.BackendException
* @see IndexQuery
*/
public List<String> query(IndexQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException;
public Stream<String> query(IndexQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException;


/**
Expand All @@ -96,7 +97,7 @@ public interface IndexProvider extends IndexInformation {
* @throws org.janusgraph.diskstorage.BackendException
* @see RawQuery
*/
public Iterable<RawQuery.Result<String>> query(RawQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException;
public Stream<RawQuery.Result<String>> query(RawQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException;

/**
* Executes the given raw query against the index and returns the total hits. e.g. limit=0
Expand All @@ -110,7 +111,6 @@ public interface IndexProvider extends IndexInformation {
*/
public Long totals(RawQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException;


/**
* Returns a transaction handle for a new index transaction.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
import org.janusgraph.diskstorage.util.BackendOperation;
import org.janusgraph.graphdb.database.idhandling.VariableLong;
import org.janusgraph.graphdb.database.serialize.DataOutput;
import org.janusgraph.graphdb.util.StreamIterable;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Wraps the transaction handle of an index and buffers all mutations against an index for efficiency.
Expand Down Expand Up @@ -98,11 +101,27 @@ public void register(String store, String key, KeyInformation information) throw
index.register(store,key,information,indexTx);
}

/**
* @deprecated use {@link #queryStream(IndexQuery query)} instead.
*/
@Deprecated
public List<String> query(IndexQuery query) throws BackendException {
return queryStream(query).collect(Collectors.toList());
}

public Stream<String> queryStream(IndexQuery query) throws BackendException {
return index.query(query,keyInformations,indexTx);
}

/**
* @deprecated use {@link #queryStream(RawQuery query)} instead.
*/
@Deprecated
public Iterable<RawQuery.Result<String>> query(RawQuery query) throws BackendException {
return new StreamIterable<>(index.query(query,keyInformations,indexTx));
}

public Stream<RawQuery.Result<String>> queryStream(RawQuery query) throws BackendException {
return index.query(query,keyInformations,indexTx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -892,8 +892,9 @@ public boolean apply(@Nullable Integer uniqueIdBitWidth) {
ConfigOption.Type.MASKABLE, String.class);

public static final ConfigOption<Integer> INDEX_MAX_RESULT_SET_SIZE = new ConfigOption<Integer>(INDEX_NS, "max-result-set-size",
"Maxium number of results to return if no limit is specified",
ConfigOption.Type.MASKABLE, 100000);
"Maxium number of results to return if no limit is specified. For index backends that support scrolling, it represents " +
"the number of results in each batch",
ConfigOption.Type.MASKABLE, 50);

public static final ConfigOption<Boolean> INDEX_NAME_MAPPING = new ConfigOption<Boolean>(INDEX_NS,"map-name",
"Whether to use the name of the property key as the field name in the index. It must be ensured, that the" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_NAME_MAPPING;

Expand Down Expand Up @@ -522,16 +523,16 @@ private static void indexMatches(JanusGraphVertex vertex, RecordEntry[] current,
Querying
################################################### */

public List<Object> query(final JointIndexQuery.Subquery query, final BackendTransaction tx) {
IndexType index = query.getIndex();
public Stream<Object> query(final JointIndexQuery.Subquery query, final BackendTransaction tx) {
final IndexType index = query.getIndex();
if (index.isCompositeIndex()) {
MultiKeySliceQuery sq = query.getCompositeQuery();
List<EntryList> rs = sq.execute(tx);
List<Object> results = new ArrayList<Object>(rs.get(0).size());
final MultiKeySliceQuery sq = query.getCompositeQuery();
final List<EntryList> rs = sq.execute(tx);
final List<Object> results = new ArrayList<>(rs.get(0).size());
for (EntryList r : rs) {
for (java.util.Iterator<Entry> iterator = r.reuseIterator(); iterator.hasNext(); ) {
Entry entry = iterator.next();
ReadBuffer entryValue = entry.asReadBuffer();
final Entry entry = iterator.next();
final ReadBuffer entryValue = entry.asReadBuffer();
entryValue.movePositionTo(entry.getValuePosition());
switch(index.getElement()) {
case VERTEX:
Expand All @@ -542,12 +543,9 @@ public List<Object> query(final JointIndexQuery.Subquery query, final BackendTra
}
}
}
return results;
return results.stream();
} else {
List<String> r = tx.indexQuery(((MixedIndexType) index).getBackingIndexName(), query.getMixedQuery());
List<Object> result = new ArrayList<Object>(r.size());
for (String id : r) result.add(string2ElementId(id));
return result;
return tx.indexQuery(((MixedIndexType) index).getBackingIndexName(), query.getMixedQuery()).map(IndexSerializer::string2ElementId);
}
}

Expand Down Expand Up @@ -581,36 +579,7 @@ public Condition<JanusGraphElement> apply(@Nullable Condition<JanusGraphElement>
}
return new IndexQuery(index.getStoreName(), newCondition, newOrders);
}
//
//
//
// public IndexQuery getQuery(String index, final ElementCategory resultType, final Condition condition, final OrderList orders) {
// if (isStandardIndex(index)) {
// Preconditions.checkArgument(orders.isEmpty());
// return new IndexQuery(getStoreName(resultType), condition, IndexQuery.NO_ORDER);
// } else {
// Condition newCondition = ConditionUtil.literalTransformation(condition,
// new Function<Condition<JanusGraphElement>, Condition<JanusGraphElement>>() {
// @Nullable
// @Override
// public Condition<JanusGraphElement> apply(@Nullable Condition<JanusGraphElement> condition) {
// Preconditions.checkArgument(condition instanceof PredicateCondition);
// PredicateCondition pc = (PredicateCondition) condition;
// JanusGraphKey key = (JanusGraphKey) pc.getKey();
// return new PredicateCondition<String, JanusGraphElement>(key2Field(key), pc.getPredicate(), pc.getValue());
// }
// });
// ImmutableList<IndexQuery.OrderEntry> newOrders = IndexQuery.NO_ORDER;
// if (!orders.isEmpty()) {
// ImmutableList.Builder<IndexQuery.OrderEntry> lb = ImmutableList.builder();
// for (int i = 0; i < orders.size(); i++) {
// lb.add(new IndexQuery.OrderEntry(key2Field(orders.getKey(i)), orders.getOrder(i), orders.getKey(i).getDataType()));
// }
// newOrders = lb.build();
// }
// return new IndexQuery(getStoreName(resultType), newCondition, newOrders);
// }
// }


/* ################################################
Common code used by executeQuery and executeTotals
Expand Down Expand Up @@ -674,20 +643,14 @@ else if (transaction.containsRelationType(keyname)) {
return queryStr;
}

public Iterable<RawQuery.Result> executeQuery(IndexQueryBuilder query, final ElementCategory resultType,
public Stream<RawQuery.Result> executeQuery(IndexQueryBuilder query, final ElementCategory resultType,
final BackendTransaction backendTx, final StandardJanusGraphTx transaction) {
final MixedIndexType index = getMixedIndex(query.getIndex(), transaction);
final String queryStr = createQueryString(query, resultType, transaction, index);
final RawQuery rawQuery = new RawQuery(index.getStoreName(),queryStr,query.getParameters());
if (query.hasLimit()) rawQuery.setLimit(query.getLimit());
rawQuery.setOffset(query.getOffset());
return Iterables.transform(backendTx.rawQuery(index.getBackingIndexName(), rawQuery), new Function<RawQuery.Result<String>, RawQuery.Result>() {
@Nullable
@Override
public RawQuery.Result apply(@Nullable RawQuery.Result<String> result) {
return new RawQuery.Result(string2ElementId(result.getResult()), result.getScore());
}
});
return backendTx.rawQuery(index.getBackingIndexName(), rawQuery).map(result -> new RawQuery.Result(string2ElementId(result.getResult()), result.getScore()));
}

public Long executeTotals(IndexQueryBuilder query, final ElementCategory resultType,
Expand Down
Loading

0 comments on commit 43755cf

Please sign in to comment.