Skip to content

Elephant Bird Lucene: Querying Indexes

isnotinvain edited this page Jan 4, 2013 · 7 revisions

For the general overview of Elephant-Bird-Lucene, see Elephant Bird Lucene

The first step is to create a LuceneIndexInputFormat. This is where you specify how your lucene indexes are searched and how the results are transformed into map reduce values.

Here's an example that retrieves all the tweets that match the given queries from the indexes created on the Creating Indexes page.

package com.example;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.lucene.analysis.core.SimpleAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.queryparser.classic.QueryParser;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.Version;

import com.twitter.elephantbird.mapreduce.input.LuceneIndexCollectAllRecordReader;
import com.twitter.elephantbird.mapreduce.input.LuceneIndexInputFormat;

/**
 * Retrieves all the hits from each index visited
 */
public class TweetIndexInputFormat extends LuceneIndexInputFormat<TweetWritable> {
  private QueryParser parser = new QueryParser(Version.LUCENE_40,
                                               TweetIndexOutputFormat.TWEET_TEXT_FIELD,
                                               new SimpleAnalyzer(Version.LUCENE_40));

  @Override
  public RecordReader<IntWritable, TweetWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
      throws IOException, InterruptedException {

    // LuceneIndexCollectAllRecordReader does some of the work for you if you want to collect every
    // hit. However, if you want full control over searching, return a new LuceneIndexRecordReader
    // which will provide you with a Query and a Searcher, and you can search however you'd like
    return new LuceneIndexCollectAllRecordReader<TweetWritable>() {
      // specify how to turn a serialized query into a Query object
      @Override
      protected Query deserializeQuery(String serializedString) throws IOException {
        try {
          return parser.parse(serializedString);
        } catch (ParseException e) {
          throw new IOException(e);
        }
      }

      // specify how to turn a Document into a map reduce value
      @Override
      protected TweetWritable docToValue(Document document) {
        TweetWritable tweet = new TweetWritable();
        tweet.setText(document.get(TweetIndexOutputFormat.TWEET_TEXT_FIELD));
        tweet.setUserId(Long.valueOf(document.get(TweetIndexOutputFormat.USER_ID_FIELD)));
        return tweet;
      }
    };
  }
}

Here's an example of using a custom collector to carry out the search:

  @Override
  public RecordReader<IntWritable, TweetWritable>
    createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    return new LuceneIndexRecordReader<TweetWritable>() {

      @Override
      protected Query deserializeQuery(String serializedString) throws IOException {
        try {
          return parser.parse(serializedString);
        } catch (ParseException e) {
          throw new IOException(e);
        }
      }

      @Override
      protected List<TweetWritable> search(IndexSearcher searcher, Query query) throws IOException {
        MyCollector myCollector = new MyCollector();
        searcher.search(query, myCollector);
        return myCollector.getTweets();
      }
    };
  }

  private static class MyCollector extends Collector {

    @Override
    public void setScorer(Scorer scorer) throws IOException {
     ...
    }

    @Override
    public void collect(int doc) throws IOException {
      ...
    }

    @Override
    public void setNextReader(AtomicReaderContext context) throws IOException {
      ...
    }

    @Override
    public boolean acceptsDocsOutOfOrder() {
      ...
    }

    public List<TweetWritable> getTweets() {
      ...
    }
  }

Now that you have a LuceneIndexInputFormat, it can be used in a map reduce job, but will require some configuration:

  • You will have to call LuceneIndexInputFormat.setQueries(List<String> queries, Configuration conf) to set the list of queries that will be run over the indexes. These query strings are the ones that will be passed to deserializeQuery above, so you can use a query parser if you'd like as in the example, or you could serialize + deserialize your queries some other way.

  • You will have to call LuceneIndexInputFormat.setInputPaths(List<Path> paths, Configuration conf) to set which indexes to search. These paths will be searched recursively for directories containing lucene indexes. The default is to look for directories that look like the output of LuceneIndexOutputFormat (eg. directories that begin with 'index-') but you may override getIndexDirPathFilter(Configuration conf) to change that behavior.

  • You may call LuceneIndexInputFormat.setMaxCombineSplitSizeBytes(long size, Configuration conf) to control how many indexes are combined into a single split (and therefore a single mapper)

Using pig to query indexes

To query lucene indexes from Pig, you'll have to create a LuceneIndexLoader that wraps your LuceneIndexInputFormat as well as specifies how to convert a map reduce record to a Tuple.

Continuing our example above:

package com.example;

import java.io.IOException;

import com.google.common.collect.ImmutableList;

import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.LoadMetadata;
import org.apache.pig.ResourceSchema;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

import com.twitter.elephantbird.mapreduce.input.LuceneIndexInputFormat;
import com.twitter.elephantbird.pig.load.LuceneIndexLoader;

/**
 * Searches indexes of tweets and returns all hits as a tuple of
 * (queryId: int, tweetText: chararray, userId: long)
 */
public class TweetIndexLoader extends LuceneIndexLoader<TweetWritable> implements LoadMetadata{

  // This is important because LuceneIndexLoader will parse the arguments passed from the
  // pig script
  public TweetIndexLoader(String[] args) {
    super(args);
  }

  // Specify how to convert a key value pair from TweetIndexInputFormat into a Tuple
  @Override
  protected Tuple recordToTuple(int queryId, TweetWritable tweetWritable) {
    return TupleFactory.getInstance().newTuple(
        ImmutableList.of(queryId, tweetWritable.getText(), tweetWritable.getUserId()));
  }

  // This tells LuceneIndexLoader that you're wrapping a TweetIndexInputFormat
  @Override
  protected LuceneIndexInputFormat<TweetWritable> getLuceneIndexInputFormat() throws IOException {
    return new TweetIndexInputFormat();
  }

  @Override
  public ResourceSchema getSchema(String s, Job job) throws IOException {
    return new ResourceSchema(
        Utils.getSchemaFromString("queryId: int, tweetText: chararray, userId: long"));
  }
  ... skipping other LoadMetadata methods ...
}

It's a good idea to make your loader implement LoadMetadata but not required. Now to use this in a pig script, there are two options. The first is to list the queries as pig string literals like this:

tweets_of_interest = load '/my/indexes/' using TweetIndexLoader('--queries', '+twitter +lucene', '+java -coffee');

The second method is to put the queries in a local file, one per line:

tweets_of_interest = load '/my/indexes/' using TweetIndexLoader('--file', 'path/to/local/file.txt');