Skip to content

Use a _recovery_source if source is omitted or modified #31106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
Expand All @@ -87,6 +88,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
Expand Down Expand Up @@ -1111,7 +1113,11 @@ private void duelRun(PercolateQuery.QueryStore queryStore, MemoryIndex memoryInd
}

private void addQuery(Query query, List<ParseContext.Document> docs) {
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
IndexMetaData build = IndexMetaData.builder("")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(1).numberOfReplicas(0).build();
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
mapperService.documentMapperParser(), documentMapper, null, null);
fieldMapper.processQuery(query, parseContext);
ParseContext.Document queryDocument = parseContext.doc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -58,6 +59,7 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperParser;
import org.elasticsearch.index.mapper.MapperParsingException;
Expand Down Expand Up @@ -182,7 +184,11 @@ public void testExtractTerms() throws Exception {

DocumentMapper documentMapper = mapperService.documentMapper("doc");
PercolatorFieldMapper fieldMapper = (PercolatorFieldMapper) documentMapper.mappers().getMapper(fieldName);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
IndexMetaData build = IndexMetaData.builder("")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(1).numberOfReplicas(0).build();
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
mapperService.documentMapperParser(), documentMapper, null, null);
fieldMapper.processQuery(bq.build(), parseContext);
ParseContext.Document document = parseContext.doc();
Expand All @@ -204,7 +210,7 @@ public void testExtractTerms() throws Exception {
bq.add(termQuery1, Occur.MUST);
bq.add(termQuery2, Occur.MUST);

parseContext = new ParseContext.InternalParseContext(Settings.EMPTY, mapperService.documentMapperParser(),
parseContext = new ParseContext.InternalParseContext(settings, mapperService.documentMapperParser(),
documentMapper, null, null);
fieldMapper.processQuery(bq.build(), parseContext);
document = parseContext.doc();
Expand Down Expand Up @@ -232,8 +238,12 @@ public void testExtractRanges() throws Exception {
bq.add(rangeQuery2, Occur.MUST);

DocumentMapper documentMapper = mapperService.documentMapper("doc");
IndexMetaData build = IndexMetaData.builder("")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(1).numberOfReplicas(0).build();
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
PercolatorFieldMapper fieldMapper = (PercolatorFieldMapper) documentMapper.mappers().getMapper(fieldName);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
mapperService.documentMapperParser(), documentMapper, null, null);
fieldMapper.processQuery(bq.build(), parseContext);
ParseContext.Document document = parseContext.doc();
Expand All @@ -259,7 +269,7 @@ public void testExtractRanges() throws Exception {
.rangeQuery(15, 20, true, true, null, null, null, null);
bq.add(rangeQuery2, Occur.MUST);

parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
parseContext = new ParseContext.InternalParseContext(settings,
mapperService.documentMapperParser(), documentMapper, null, null);
fieldMapper.processQuery(bq.build(), parseContext);
document = parseContext.doc();
Expand All @@ -283,7 +293,11 @@ public void testExtractTermsAndRanges_failed() throws Exception {
TermRangeQuery query = new TermRangeQuery("field1", new BytesRef("a"), new BytesRef("z"), true, true);
DocumentMapper documentMapper = mapperService.documentMapper("doc");
PercolatorFieldMapper fieldMapper = (PercolatorFieldMapper) documentMapper.mappers().getMapper(fieldName);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
IndexMetaData build = IndexMetaData.builder("")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(1).numberOfReplicas(0).build();
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
mapperService.documentMapperParser(), documentMapper, null, null);
fieldMapper.processQuery(query, parseContext);
ParseContext.Document document = parseContext.doc();
Expand All @@ -298,7 +312,11 @@ public void testExtractTermsAndRanges_partial() throws Exception {
PhraseQuery phraseQuery = new PhraseQuery("field", "term");
DocumentMapper documentMapper = mapperService.documentMapper("doc");
PercolatorFieldMapper fieldMapper = (PercolatorFieldMapper) documentMapper.mappers().getMapper(fieldName);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
IndexMetaData build = IndexMetaData.builder("")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(1).numberOfReplicas(0).build();
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
mapperService.documentMapperParser(), documentMapper, null, null);
fieldMapper.processQuery(phraseQuery, parseContext);
ParseContext.Document document = parseContext.doc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
Expand Down Expand Up @@ -2013,7 +2014,8 @@ private IndexWriterConfig getIndexWriterConfig() {
MergePolicy mergePolicy = config().getMergePolicy();
if (softDeleteEnabled) {
iwc.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD);
mergePolicy = new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, this::softDeletesRetentionQuery, mergePolicy);
mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, this::softDeletesRetentionQuery,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, a single retention query for both 💯

new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, this::softDeletesRetentionQuery, mergePolicy));
}
iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy));
iwc.setSimilarity(engineConfig.getSimilarity());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -196,7 +197,9 @@ private Translog.Operation readDocAsOp(int docID) throws IOException {
return null;
}
final long version = docValues[leaf.ord].docVersion(segmentDocID);
final FieldsVisitor fields = new FieldsVisitor(true);
final String sourceField = docValues[leaf.ord].hasRecoverySource(segmentDocID) ? SourceFieldMapper.RECOVERY_SOURCE_NAME :
SourceFieldMapper.NAME;
final FieldsVisitor fields = new FieldsVisitor(true, sourceField);
indexSearcher.doc(docID, fields);
fields.postProcess(mapperService);

Expand All @@ -218,7 +221,7 @@ private Translog.Operation readDocAsOp(int docID) throws IOException {
// TODO: pass the latest timestamp from engine.
final long autoGeneratedIdTimestamp = -1;
op = new Translog.Index(type, id, seqNo, primaryTerm, version, VersionType.INTERNAL,
source.toBytesRef().bytes, fields.routing(), autoGeneratedIdTimestamp);
source == null ? null : source.toBytesRef().bytes, fields.routing(), autoGeneratedIdTimestamp);
}
}
assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo() : "Unexpected operation; " +
Expand All @@ -240,6 +243,7 @@ private static final class CombinedDocValues {
private NumericDocValues seqNoDV;
private NumericDocValues primaryTermDV;
private NumericDocValues tombstoneDV;
private NumericDocValues recoverySource;

CombinedDocValues(LeafReader leafReader) throws IOException {
this.leafReader = leafReader;
Expand All @@ -248,6 +252,7 @@ private static final class CombinedDocValues {
this.primaryTermDV = Objects.requireNonNull(
leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing");
this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
this.recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME);
}

long docVersion(int segmentDocId) throws IOException {
Expand Down Expand Up @@ -293,5 +298,15 @@ boolean isTombstone(int segmentDocId) throws IOException {
}
return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0;
}

boolean hasRecoverySource(int segmentDocId) throws IOException {
if (recoverySource == null) {
return false;
}
if (recoverySource.docID() > segmentDocId) {
recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME);
}
return recoverySource.advanceExact(segmentDocId);
}
}
}
Loading