-
Notifications
You must be signed in to change notification settings - Fork 115
Modify getCandidateIndex for hybrid scan #153
Changes from all commits
e7fd0bb
11aaee6
a519f16
cdc5c2d
cecacd7
b2b9605
667ce55
ea81d3c
a7f2c3f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,10 +19,10 @@ package com.microsoft.hyperspace.index.rules | |
import scala.collection.mutable | ||
|
||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
import org.apache.spark.sql.execution.datasources.LogicalRelation | ||
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} | ||
|
||
import com.microsoft.hyperspace.actions.Constants | ||
import com.microsoft.hyperspace.index.{IndexLogEntry, IndexManager, LogicalPlanSignatureProvider} | ||
import com.microsoft.hyperspace.index.{FileInfo, IndexLogEntry, IndexManager, LogicalPlanSignatureProvider, PlanSignatureProvider} | ||
|
||
object RuleUtils { | ||
|
||
|
@@ -31,9 +31,13 @@ object RuleUtils { | |
* | ||
* @param indexManager indexManager | ||
* @param plan logical plan | ||
* @param hybridScanEnabled Flag that checks if hybrid scan is enabled or disabled. | ||
* @return indexes built for this plan | ||
*/ | ||
def getCandidateIndexes(indexManager: IndexManager, plan: LogicalPlan): Seq[IndexLogEntry] = { | ||
def getCandidateIndexes( | ||
indexManager: IndexManager, | ||
plan: LogicalPlan, | ||
hybridScanEnabled: Boolean = false): Seq[IndexLogEntry] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can we remove default value? Rationale: this is an user-facing config, which is set to false by default. If we later change the default to true, we need to remember to update this to true as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's temporary for this PR - I'll remove the default value assignment & use the config value instead in the next pr. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. In that case, please leave a review comment on your intention. That helps reviewers. |
||
// Map of a signature provider to a signature generated for the given plan. | ||
val signatureMap = mutable.Map[String, Option[String]]() | ||
|
||
|
@@ -51,11 +55,41 @@ object RuleUtils { | |
} | ||
} | ||
|
||
def isHybridScanCandidate(entry: IndexLogEntry, filesByRelations: Seq[FileInfo]): Boolean = { | ||
// TODO: Some threshold about the similarity of source data files - number of common files or | ||
// total size of common files. | ||
// See https://github.com/microsoft/hyperspace/issues/159 | ||
// TODO: As in [[PlanSignatureProvider]], Source plan signature comparison is required to | ||
// support arbitrary source plans at index creation. | ||
// See https://github.com/microsoft/hyperspace/issues/158 | ||
|
||
// Find a common file between the input relation & index source files. | ||
// Without the threshold described above, we can utilize exists & contain functions here. | ||
filesByRelations.exists(entry.allSourceFileInfos.contains) | ||
} | ||
|
||
// TODO: the following check only considers indexes in ACTIVE state for usage. Update | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I remember there is already an issue for this. If so, can we just link to the issue containing the description. @apoorvedave1 to confirm if there is an issue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems #65; I'll add the link here. |
||
// the code to support indexes in transitioning states as well. | ||
// See https://github.com/microsoft/hyperspace/issues/65 | ||
val allIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE)) | ||
|
||
allIndexes.filter(index => index.created && signatureValid(index)) | ||
if (hybridScanEnabled) { | ||
val filesByRelations = plan | ||
.collect { | ||
case LogicalRelation( | ||
HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), | ||
_, | ||
_, | ||
_) => | ||
location.allFiles.map(f => | ||
FileInfo(f.getPath.toString, f.getLen, f.getModificationTime)) | ||
} | ||
sezruby marked this conversation as resolved.
Show resolved
Hide resolved
|
||
assert(filesByRelations.length == 1) | ||
allIndexes.filter(index => | ||
index.created && isHybridScanCandidate(index, filesByRelations.flatten)) | ||
} else { | ||
allIndexes.filter(index => index.created && signatureValid(index)) | ||
} | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ import org.apache.spark.sql.types.{StructField, StructType} | |
import com.microsoft.hyperspace.HyperspaceException | ||
import com.microsoft.hyperspace.actions.Constants | ||
import com.microsoft.hyperspace.index._ | ||
import com.microsoft.hyperspace.index.Hdfs.Properties | ||
|
||
trait HyperspaceRuleTestSuite extends HyperspaceSuite { | ||
private val filenames = Seq("f1.parquet", "f2.parquet") | ||
|
@@ -39,7 +40,13 @@ trait HyperspaceRuleTestSuite extends HyperspaceSuite { | |
LogicalPlanSignatureProvider.create(signClass).signature(plan) match { | ||
case Some(s) => | ||
val sourcePlanProperties = SparkPlan.Properties( | ||
Seq(), | ||
Seq( | ||
Relation( | ||
Seq("dummy"), | ||
Hdfs(Properties(Content(Directory("/")))), | ||
"schema", | ||
"format", | ||
Map())), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggestion: #142 (comment) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is required - empty root directory throws an exception during the test. |
||
null, | ||
null, | ||
LogicalPlanFingerprint(LogicalPlanFingerprint.Properties(Seq(Signature(signClass, s))))) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently this
plan
is always the result ofRuleUtils.getLogicalRelation
meaning thisplan
is expected to be in a fixed shape (linear, etc.). What would be the best way to address this? cc: @apoorvedave1 / @pirzThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @rapoth I modified #160 a bit for this, but I think we need a new uber-issue for "arbitrary source plan" though it will be handled later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea! Can you also open an uber issue for that when you get a chance please? (even if it is a work-in-progress issue, let's just capture all the work as we know so far)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I'll 👍