Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream the result from Search Backend instead of use List. #481

Merged
merged 1 commit into from
Sep 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.janusgraph.core;

import org.janusgraph.core.schema.Parameter;
import java.util.stream.Stream;
import org.apache.tinkerpop.gremlin.structure.Element;

/**
Expand Down Expand Up @@ -82,24 +83,54 @@ public interface JanusGraphIndexQuery {
/**
* Returns all vertices that match the query in the indexing backend.
*
* @deprecated use {@link #vertexStream()} instead.
*
* @return
*/
@Deprecated
public Iterable<Result<JanusGraphVertex>> vertices();

/**
* Returns all vertices that match the query in the indexing backend.
*
* @return
*/
public Stream<Result<JanusGraphVertex>> vertexStream();

/**
* Returns all edges that match the query in the indexing backend.
*
* @deprecated use {@link #edgeStream()} instead.
*
* @return
*/
@Deprecated
public Iterable<Result<JanusGraphEdge>> edges();

/**
* Returns all edges that match the query in the indexing backend.
*
* @return
*/
public Stream<Result<JanusGraphEdge>> edgeStream();

/**
* Returns all properties that match the query in the indexing backend.
*
* @deprecated use {@link #propertyStream()} instead.
*
* @return
*/
@Deprecated
public Iterable<Result<JanusGraphVertexProperty>> properties();

/**
* Returns all properties that match the query in the indexing backend.
*
* @return
*/
public Stream<Result<JanusGraphVertexProperty>> propertyStream();

/**
* Returns total vertices that match the query in the indexing backend ignoring limit and offset.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import org.janusgraph.diskstorage.keycolumnvalue.cache.KCVSCache;
import org.janusgraph.diskstorage.log.kcvs.ExternalCachePersistor;
Expand Down Expand Up @@ -408,12 +409,12 @@ public String toString() {
}


public List<String> indexQuery(final String index, final IndexQuery query) {
public Stream<String> indexQuery(final String index, final IndexQuery query) {
final IndexTransaction indexTx = getIndexTransaction(index);
return executeRead(new Callable<List<String>>() {
return executeRead(new Callable<Stream<String>>() {
@Override
public List<String> call() throws Exception {
return indexTx.query(query);
public Stream<String> call() throws Exception {
return indexTx.queryStream(query);
}

@Override
Expand All @@ -423,12 +424,12 @@ public String toString() {
});
}

public Iterable<RawQuery.Result<String>> rawQuery(final String index, final RawQuery query) {
public Stream<RawQuery.Result<String>> rawQuery(final String index, final RawQuery query) {
final IndexTransaction indexTx = getIndexTransaction(index);
return executeRead(new Callable<Iterable<RawQuery.Result<String>>>() {
return executeRead(new Callable<Stream<RawQuery.Result<String>>>() {
@Override
public Iterable<RawQuery.Result<String>> call() throws Exception {
return indexTx.query(query);
public Stream<RawQuery.Result<String>> call() throws Exception {
return indexTx.queryStream(query);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

/**
* External index for querying.
Expand Down Expand Up @@ -83,7 +84,7 @@ public interface IndexProvider extends IndexInformation {
* @throws org.janusgraph.diskstorage.BackendException
* @see IndexQuery
*/
public List<String> query(IndexQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException;
public Stream<String> query(IndexQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException;


/**
Expand All @@ -96,7 +97,7 @@ public interface IndexProvider extends IndexInformation {
* @throws org.janusgraph.diskstorage.BackendException
* @see RawQuery
*/
public Iterable<RawQuery.Result<String>> query(RawQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException;
public Stream<RawQuery.Result<String>> query(RawQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException;

/**
* Executes the given raw query against the index and returns the total hits. e.g. limit=0
Expand All @@ -110,7 +111,6 @@ public interface IndexProvider extends IndexInformation {
*/
public Long totals(RawQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException;


/**
* Returns a transaction handle for a new index transaction.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
import org.janusgraph.diskstorage.util.BackendOperation;
import org.janusgraph.graphdb.database.idhandling.VariableLong;
import org.janusgraph.graphdb.database.serialize.DataOutput;
import org.janusgraph.graphdb.util.StreamIterable;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Wraps the transaction handle of an index and buffers all mutations against an index for efficiency.
Expand Down Expand Up @@ -98,11 +101,27 @@ public void register(String store, String key, KeyInformation information) throw
index.register(store,key,information,indexTx);
}

/**
* @deprecated use {@link #queryStream(IndexQuery query)} instead.
*/
@Deprecated
public List<String> query(IndexQuery query) throws BackendException {
return queryStream(query).collect(Collectors.toList());
}

public Stream<String> queryStream(IndexQuery query) throws BackendException {
return index.query(query,keyInformations,indexTx);
}

/**
* @deprecated use {@link #queryStream(RawQuery query)} instead.
*/
@Deprecated
public Iterable<RawQuery.Result<String>> query(RawQuery query) throws BackendException {
return new StreamIterable<>(index.query(query,keyInformations,indexTx));
}

public Stream<RawQuery.Result<String>> queryStream(RawQuery query) throws BackendException {
return index.query(query,keyInformations,indexTx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -892,8 +892,9 @@ public boolean apply(@Nullable Integer uniqueIdBitWidth) {
ConfigOption.Type.MASKABLE, String.class);

public static final ConfigOption<Integer> INDEX_MAX_RESULT_SET_SIZE = new ConfigOption<Integer>(INDEX_NS, "max-result-set-size",
"Maxium number of results to return if no limit is specified",
ConfigOption.Type.MASKABLE, 100000);
"Maxium number of results to return if no limit is specified. For index backends that support scrolling, it represents " +
"the number of results in each batch",
ConfigOption.Type.MASKABLE, 50);

public static final ConfigOption<Boolean> INDEX_NAME_MAPPING = new ConfigOption<Boolean>(INDEX_NS,"map-name",
"Whether to use the name of the property key as the field name in the index. It must be ensured, that the" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_NAME_MAPPING;

Expand Down Expand Up @@ -522,16 +523,16 @@ private static void indexMatches(JanusGraphVertex vertex, RecordEntry[] current,
Querying
################################################### */

public List<Object> query(final JointIndexQuery.Subquery query, final BackendTransaction tx) {
IndexType index = query.getIndex();
public Stream<Object> query(final JointIndexQuery.Subquery query, final BackendTransaction tx) {
final IndexType index = query.getIndex();
if (index.isCompositeIndex()) {
MultiKeySliceQuery sq = query.getCompositeQuery();
List<EntryList> rs = sq.execute(tx);
List<Object> results = new ArrayList<Object>(rs.get(0).size());
final MultiKeySliceQuery sq = query.getCompositeQuery();
final List<EntryList> rs = sq.execute(tx);
final List<Object> results = new ArrayList<>(rs.get(0).size());
for (EntryList r : rs) {
for (java.util.Iterator<Entry> iterator = r.reuseIterator(); iterator.hasNext(); ) {
Entry entry = iterator.next();
ReadBuffer entryValue = entry.asReadBuffer();
final Entry entry = iterator.next();
final ReadBuffer entryValue = entry.asReadBuffer();
entryValue.movePositionTo(entry.getValuePosition());
switch(index.getElement()) {
case VERTEX:
Expand All @@ -542,12 +543,9 @@ public List<Object> query(final JointIndexQuery.Subquery query, final BackendTra
}
}
}
return results;
return results.stream();
} else {
List<String> r = tx.indexQuery(((MixedIndexType) index).getBackingIndexName(), query.getMixedQuery());
List<Object> result = new ArrayList<Object>(r.size());
for (String id : r) result.add(string2ElementId(id));
return result;
return tx.indexQuery(((MixedIndexType) index).getBackingIndexName(), query.getMixedQuery()).map(IndexSerializer::string2ElementId);
}
}

Expand Down Expand Up @@ -581,36 +579,7 @@ public Condition<JanusGraphElement> apply(@Nullable Condition<JanusGraphElement>
}
return new IndexQuery(index.getStoreName(), newCondition, newOrders);
}
//
//
//
// public IndexQuery getQuery(String index, final ElementCategory resultType, final Condition condition, final OrderList orders) {
// if (isStandardIndex(index)) {
// Preconditions.checkArgument(orders.isEmpty());
// return new IndexQuery(getStoreName(resultType), condition, IndexQuery.NO_ORDER);
// } else {
// Condition newCondition = ConditionUtil.literalTransformation(condition,
// new Function<Condition<JanusGraphElement>, Condition<JanusGraphElement>>() {
// @Nullable
// @Override
// public Condition<JanusGraphElement> apply(@Nullable Condition<JanusGraphElement> condition) {
// Preconditions.checkArgument(condition instanceof PredicateCondition);
// PredicateCondition pc = (PredicateCondition) condition;
// JanusGraphKey key = (JanusGraphKey) pc.getKey();
// return new PredicateCondition<String, JanusGraphElement>(key2Field(key), pc.getPredicate(), pc.getValue());
// }
// });
// ImmutableList<IndexQuery.OrderEntry> newOrders = IndexQuery.NO_ORDER;
// if (!orders.isEmpty()) {
// ImmutableList.Builder<IndexQuery.OrderEntry> lb = ImmutableList.builder();
// for (int i = 0; i < orders.size(); i++) {
// lb.add(new IndexQuery.OrderEntry(key2Field(orders.getKey(i)), orders.getOrder(i), orders.getKey(i).getDataType()));
// }
// newOrders = lb.build();
// }
// return new IndexQuery(getStoreName(resultType), newCondition, newOrders);
// }
// }


/* ################################################
Common code used by executeQuery and executeTotals
Expand Down Expand Up @@ -674,20 +643,14 @@ else if (transaction.containsRelationType(keyname)) {
return queryStr;
}

public Iterable<RawQuery.Result> executeQuery(IndexQueryBuilder query, final ElementCategory resultType,
public Stream<RawQuery.Result> executeQuery(IndexQueryBuilder query, final ElementCategory resultType,
final BackendTransaction backendTx, final StandardJanusGraphTx transaction) {
final MixedIndexType index = getMixedIndex(query.getIndex(), transaction);
final String queryStr = createQueryString(query, resultType, transaction, index);
final RawQuery rawQuery = new RawQuery(index.getStoreName(),queryStr,query.getParameters());
if (query.hasLimit()) rawQuery.setLimit(query.getLimit());
rawQuery.setOffset(query.getOffset());
return Iterables.transform(backendTx.rawQuery(index.getBackingIndexName(), rawQuery), new Function<RawQuery.Result<String>, RawQuery.Result>() {
@Nullable
@Override
public RawQuery.Result apply(@Nullable RawQuery.Result<String> result) {
return new RawQuery.Result(string2ElementId(result.getResult()), result.getScore());
}
});
return backendTx.rawQuery(index.getBackingIndexName(), rawQuery).map(result -> new RawQuery.Result(string2ElementId(result.getResult()), result.getScore()));
}

public Long executeTotals(IndexQueryBuilder query, final ElementCategory resultType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,23 @@

package org.janusgraph.graphdb.query.graph;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.janusgraph.core.*;
import org.janusgraph.core.schema.Parameter;
import org.janusgraph.diskstorage.indexing.RawQuery;
import org.janusgraph.graphdb.database.IndexSerializer;
import org.janusgraph.graphdb.internal.ElementCategory;
import org.janusgraph.graphdb.query.BaseQuery;
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
import org.janusgraph.graphdb.util.StreamIterable;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.List;
import java.util.stream.Stream;

/**
* Implementation of {@link JanusGraphIndexQuery} for string based queries that are issued directly against the specified
Expand Down Expand Up @@ -184,25 +182,12 @@ public IndexQueryBuilder addParameters(Parameter... paras) {
return this;
}

private Iterable<Result<JanusGraphElement>> execute(ElementCategory resultType) {
private <E extends JanusGraphElement> Stream<Result<E>> execute(ElementCategory resultType, Class<E> resultClass) {
Preconditions.checkNotNull(indexName);
Preconditions.checkNotNull(query);
if (tx.hasModifications())
log.warn("Modifications in this transaction might not be accurately reflected in this index query: {}",query);
Iterable<RawQuery.Result> result = serializer.executeQuery(this,resultType,tx.getTxHandle(),tx);
final Function<Object, ? extends JanusGraphElement> conversionFct = tx.getConversionFunction(resultType);
return Iterables.filter(Iterables.transform(result, new Function<RawQuery.Result, Result<JanusGraphElement>>() {
@Nullable
@Override
public Result<JanusGraphElement> apply(@Nullable RawQuery.Result result) {
return new ResultImpl<JanusGraphElement>(conversionFct.apply(result.getResult()),result.getScore());
}
}),new Predicate<Result<JanusGraphElement>>() {
@Override
public boolean apply(@Nullable Result<JanusGraphElement> r) {
return !r.getElement().isRemoved();
}
});
return serializer.executeQuery(this, resultType, tx.getTxHandle(),tx).map(r -> (Result<E>) new ResultImpl<>(tx.getConversionFunction(resultType).apply(r.getResult()), r.getScore())).filter(r -> !r.getElement().isRemoved());
}

private Long executeTotals(ElementCategory resultType) {
Expand All @@ -216,20 +201,35 @@ private Long executeTotals(ElementCategory resultType) {

@Override
public Iterable<Result<JanusGraphVertex>> vertices() {
return new StreamIterable<>(vertexStream());
}

@Override
public Stream<Result<JanusGraphVertex>> vertexStream() {
setPrefixInternal(VERTEX_PREFIX);
return (Iterable)execute(ElementCategory.VERTEX);
return execute(ElementCategory.VERTEX, JanusGraphVertex.class);
}

@Override
public Iterable<Result<JanusGraphEdge>> edges() {
return new StreamIterable<>(edgeStream());
}

@Override
public Stream<Result<JanusGraphEdge>> edgeStream() {
setPrefixInternal(EDGE_PREFIX);
return (Iterable)execute(ElementCategory.EDGE);
return execute(ElementCategory.EDGE, JanusGraphEdge.class);
}

@Override
public Iterable<Result<JanusGraphVertexProperty>> properties() {
return new StreamIterable<>(propertyStream());
}

@Override
public Stream<Result<JanusGraphVertexProperty>> propertyStream() {
setPrefixInternal(PROPERTY_PREFIX);
return (Iterable)execute(ElementCategory.PROPERTY);
return execute(ElementCategory.PROPERTY, JanusGraphVertexProperty.class);
}

@Override
Expand Down
Loading