-
Notifications
You must be signed in to change notification settings - Fork 387
Elephant Bird Lucene: Querying Indexes
For the general overview of Elephant-Bird-Lucene, see Elephant Bird Lucene
The first step is to create a LuceneIndexInputFormat
. This is where you specify how your lucene indexes are searched and how the results are transformed into map reduce values.
Here's an example that retrieves all the tweets that match the given queries from the indexes created on the Creating Indexes page.
package com.example;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.lucene.analysis.core.SimpleAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.queryparser.classic.QueryParser;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.Version;
import com.twitter.elephantbird.mapreduce.input.LuceneIndexCollectAllRecordReader;
import com.twitter.elephantbird.mapreduce.input.LuceneIndexInputFormat;
/**
* Retrieves all the hits from each index visited
*/
public class TweetIndexInputFormat extends LuceneIndexInputFormat<TweetWritable> {
private QueryParser parser = new QueryParser(Version.LUCENE_40,
TweetIndexOutputFormat.TWEET_TEXT_FIELD,
new SimpleAnalyzer(Version.LUCENE_40));
@Override
public RecordReader<IntWritable, TweetWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// LuceneIndexCollectAllRecordReader does some of the work for you if you want to collect every
// hit. However, if you want full control over searching, return a new LuceneIndexRecordReader
// which will provide you with a Query and a Searcher, and you can search however you'd like
return new LuceneIndexCollectAllRecordReader<TweetWritable>() {
// specify how to turn a serialized query into a Query object
@Override
protected Query deserializeQuery(String serializedString) throws IOException {
try {
return parser.parse(serializedString);
} catch (ParseException e) {
throw new IOException(e);
}
}
// specify how to turn a Document into a map reduce value
@Override
protected TweetWritable docToValue(Document document) {
TweetWritable tweet = new TweetWritable();
tweet.setText(document.get(TweetIndexOutputFormat.TWEET_TEXT_FIELD));
tweet.setUserId(Long.valueOf(document.get(TweetIndexOutputFormat.USER_ID_FIELD)));
return tweet;
}
};
}
}
Here's an example of using a custom collector to carry out the search:
@Override
public RecordReader<IntWritable, TweetWritable>
createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new LuceneIndexRecordReader<TweetWritable>() {
@Override
protected Query deserializeQuery(String serializedString) throws IOException {
try {
return parser.parse(serializedString);
} catch (ParseException e) {
throw new IOException(e);
}
}
@Override
protected List<TweetWritable> search(IndexSearcher searcher, Query query) throws IOException {
MyCollector myCollector = new MyCollector();
searcher.search(query, myCollector);
return myCollector.getTweets();
}
};
}
private static class MyCollector extends Collector {
@Override
public void setScorer(Scorer scorer) throws IOException {
...
}
@Override
public void collect(int doc) throws IOException {
...
}
@Override
public void setNextReader(AtomicReaderContext context) throws IOException {
...
}
@Override
public boolean acceptsDocsOutOfOrder() {
...
}
public List<TweetWritable> getTweets() {
...
}
}
Now that you have a LuceneIndexInputFormat
, it can be used in a map reduce job, but will require some configuration:
-
You will have to call
LuceneIndexInputFormat.setQueries(List<String> queries, Configuration conf)
to set the list of queries that will be run over the indexes. These query strings are the ones that will be passed todeserializeQuery
above, so you can use a query parser if you'd like as in the example, or you could serialize + deserialize your queries some other way. -
You will have to call
LuceneIndexInputFormat.setInputPaths(List<Path> paths, Configuration conf)
to set which indexes to search. These paths will be searched recursively for directories containing lucene indexes. The default is to look for directories that look like the output ofLuceneIndexOutputFormat
(eg. directories that begin with 'index-') but you may overridegetIndexDirPathFilter(Configuration conf)
to change that behavior. -
You may call
LuceneIndexInputFormat.setMaxCombineSplitSizeBytes(long size, Configuration conf)
to control how many indexes are combined into a single split (and therefore a single mapper)
To query lucene indexes from Pig, you'll have to create a LuceneIndexLoader
that wraps your LuceneIndexInputFormat
as well as specifies how to convert a map reduce record to a Tuple.
Continuing our example above:
package com.example;
import java.io.IOException;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.LoadMetadata;
import org.apache.pig.ResourceSchema;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import com.twitter.elephantbird.mapreduce.input.LuceneIndexInputFormat;
import com.twitter.elephantbird.pig.load.LuceneIndexLoader;
/**
* Searches indexes of tweets and returns all hits as a tuple of
* (queryId: int, tweetText: chararray, userId: long)
*/
public class TweetIndexLoader extends LuceneIndexLoader<TweetWritable> implements LoadMetadata{
// This is important because LuceneIndexLoader will parse the arguments passed from the
// pig script
public TweetIndexLoader(String[] args) {
super(args);
}
// Specify how to convert a key value pair from TweetIndexInputFormat into a Tuple
@Override
protected Tuple recordToTuple(int queryId, TweetWritable tweetWritable) {
return TupleFactory.getInstance().newTuple(
ImmutableList.of(queryId, tweetWritable.getText(), tweetWritable.getUserId()));
}
// This tells LuceneIndexLoader that you're wrapping a TweetIndexInputFormat
@Override
protected LuceneIndexInputFormat<TweetWritable> getLuceneIndexInputFormat() throws IOException {
return new TweetIndexInputFormat();
}
@Override
public ResourceSchema getSchema(String s, Job job) throws IOException {
return new ResourceSchema(
Utils.getSchemaFromString("queryId: int, tweetText: chararray, userId: long"));
}
... skipping other LoadMetadata methods ...
}
It's a good idea to make your loader implement LoadMetadata
but not required. Now to use this in a pig script, there are two options. The first is to list the queries as pig string literals like this:
tweets_of_interest = load '/my/indexes/' using TweetIndexLoader('--queries', '+twitter +lucene', '+java -coffee');
The second method is to put the queries in a local file, one per line:
tweets_of_interest = load '/my/indexes/' using TweetIndexLoader('--file', 'path/to/local/file.txt');