Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.

Commit b7e70af

Browse files
authored
Modify getCandidateIndex for hybrid scan (#153)
1 parent 1c3b020 commit b7e70af

File tree

4 files changed

+154
-16
lines changed

4 files changed

+154
-16
lines changed

src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,25 @@ case class Content(root: Directory, fingerprint: NoOpFingerprint = NoOpFingerpri
4141
@JsonIgnore
4242
lazy val files: Seq[Path] = {
4343
// Recursively find files from directory tree.
44-
def rec(prefixPath: Path, directory: Directory): Seq[Path] = {
45-
val files = directory.files.map(f => new Path(prefixPath, f.name))
46-
files ++ directory.subDirs.flatMap { dir =>
47-
rec(new Path(prefixPath, dir.name), dir)
48-
}
49-
}
44+
rec(new Path(root.name), root, (f, prefix) => new Path(prefix, f.name))
45+
}
5046

51-
rec(new Path(root.name), root)
47+
@JsonIgnore
48+
lazy val fileInfos: Set[FileInfo] = {
49+
rec(
50+
new Path(root.name),
51+
root,
52+
(f, prefix) => FileInfo(new Path(prefix, f.name).toString, f.size, f.modifiedTime)).toSet
53+
}
54+
55+
private def rec[T](
56+
prefixPath: Path,
57+
directory: Directory,
58+
func: (FileInfo, Path) => T): Seq[T] = {
59+
val files = directory.files.map(f => func(f, prefixPath))
60+
files ++ directory.subDirs.flatMap { dir =>
61+
rec(new Path(prefixPath, dir.name), dir, func)
62+
}
5263
}
5364
}
5465

@@ -295,7 +306,18 @@ case class IndexLogEntry(
295306

296307
def created: Boolean = state.equals(Constants.States.ACTIVE)
297308

298-
def relations: Seq[Relation] = source.plan.properties.relations
309+
def relations: Seq[Relation] = {
310+
// Only one relation is currently supported.
311+
assert(source.plan.properties.relations.size == 1)
312+
source.plan.properties.relations
313+
}
314+
315+
@JsonIgnore
316+
lazy val allSourceFileInfos: Set[FileInfo] = {
317+
relations
318+
.flatMap(_.data.properties.content.fileInfos)
319+
.toSet
320+
}
299321

300322
override def equals(o: Any): Boolean = o match {
301323
case that: IndexLogEntry =>

src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ package com.microsoft.hyperspace.index.rules
1919
import scala.collection.mutable
2020

2121
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
22-
import org.apache.spark.sql.execution.datasources.LogicalRelation
22+
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex}
2323

2424
import com.microsoft.hyperspace.actions.Constants
25-
import com.microsoft.hyperspace.index.{IndexLogEntry, IndexManager, LogicalPlanSignatureProvider}
25+
import com.microsoft.hyperspace.index.{FileInfo, IndexLogEntry, IndexManager, LogicalPlanSignatureProvider, PlanSignatureProvider}
2626

2727
object RuleUtils {
2828

@@ -31,9 +31,13 @@ object RuleUtils {
3131
*
3232
* @param indexManager indexManager
3333
* @param plan logical plan
34+
* @param hybridScanEnabled Flag that checks if hybrid scan is enabled or disabled.
3435
* @return indexes built for this plan
3536
*/
36-
def getCandidateIndexes(indexManager: IndexManager, plan: LogicalPlan): Seq[IndexLogEntry] = {
37+
def getCandidateIndexes(
38+
indexManager: IndexManager,
39+
plan: LogicalPlan,
40+
hybridScanEnabled: Boolean = false): Seq[IndexLogEntry] = {
3741
// Map of a signature provider to a signature generated for the given plan.
3842
val signatureMap = mutable.Map[String, Option[String]]()
3943

@@ -51,11 +55,41 @@ object RuleUtils {
5155
}
5256
}
5357

58+
def isHybridScanCandidate(entry: IndexLogEntry, filesByRelations: Seq[FileInfo]): Boolean = {
59+
// TODO: Some threshold about the similarity of source data files - number of common files or
60+
// total size of common files.
61+
// See https://github.com/microsoft/hyperspace/issues/159
62+
// TODO: As in [[PlanSignatureProvider]], Source plan signature comparison is required to
63+
// support arbitrary source plans at index creation.
64+
// See https://github.com/microsoft/hyperspace/issues/158
65+
66+
// Find a common file between the input relation & index source files.
67+
// Without the threshold described above, we can utilize exists & contain functions here.
68+
filesByRelations.exists(entry.allSourceFileInfos.contains)
69+
}
70+
5471
// TODO: the following check only considers indexes in ACTIVE state for usage. Update
5572
// the code to support indexes in transitioning states as well.
73+
// See https://github.com/microsoft/hyperspace/issues/65
5674
val allIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE))
5775

58-
allIndexes.filter(index => index.created && signatureValid(index))
76+
if (hybridScanEnabled) {
77+
val filesByRelations = plan
78+
.collect {
79+
case LogicalRelation(
80+
HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _),
81+
_,
82+
_,
83+
_) =>
84+
location.allFiles.map(f =>
85+
FileInfo(f.getPath.toString, f.getLen, f.getModificationTime))
86+
}
87+
assert(filesByRelations.length == 1)
88+
allIndexes.filter(index =>
89+
index.created && isHybridScanCandidate(index, filesByRelations.flatten))
90+
} else {
91+
allIndexes.filter(index => index.created && signatureValid(index))
92+
}
5993
}
6094

6195
/**

src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.types.{StructField, StructType}
2626
import com.microsoft.hyperspace.HyperspaceException
2727
import com.microsoft.hyperspace.actions.Constants
2828
import com.microsoft.hyperspace.index._
29+
import com.microsoft.hyperspace.index.Hdfs.Properties
2930

3031
trait HyperspaceRuleTestSuite extends HyperspaceSuite {
3132
private val filenames = Seq("f1.parquet", "f2.parquet")
@@ -39,7 +40,13 @@ trait HyperspaceRuleTestSuite extends HyperspaceSuite {
3940
LogicalPlanSignatureProvider.create(signClass).signature(plan) match {
4041
case Some(s) =>
4142
val sourcePlanProperties = SparkPlan.Properties(
42-
Seq(),
43+
Seq(
44+
Relation(
45+
Seq("dummy"),
46+
Hdfs(Properties(Content(Directory("/")))),
47+
"schema",
48+
"format",
49+
Map())),
4350
null,
4451
null,
4552
LogicalPlanFingerprint(LogicalPlanFingerprint.Properties(Seq(Signature(signClass, s)))))

src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616

1717
package com.microsoft.hyperspace.index.rules
1818

19-
import org.apache.hadoop.fs.Path
19+
import org.apache.hadoop.conf.Configuration
20+
import org.apache.hadoop.fs.{FileUtil, Path}
2021
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, IsNotNull}
2122
import org.apache.spark.sql.catalyst.plans.JoinType
2223
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project}
23-
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation, NoopCache}
24+
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation, NoopCache, PartitioningAwareFileIndex}
2425
import org.apache.spark.sql.types.{IntegerType, StringType}
2526

26-
import com.microsoft.hyperspace.index.IndexCollectionManager
27+
import com.microsoft.hyperspace.index.{IndexCollectionManager, IndexConfig}
2728
import com.microsoft.hyperspace.util.PathUtils
2829

2930
class RuleUtilsTest extends HyperspaceRuleTestSuite {
@@ -110,6 +111,80 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite {
110111
assert(r.isEmpty)
111112
}
112113

114+
test("Verify getCandidateIndex for hybrid scan") {
115+
val indexManager = IndexCollectionManager(spark)
116+
val df = spark.range(1, 5).toDF("id")
117+
val dataPath = systemPath.toString + "/hbtable"
118+
df.write.parquet(dataPath)
119+
120+
withIndex("index1") {
121+
val readDf = spark.read.parquet(dataPath)
122+
indexManager.create(readDf, IndexConfig("index1", Seq("id")))
123+
124+
def verify(
125+
plan: LogicalPlan,
126+
hybridScanEnabled: Boolean,
127+
expectCandidateIndex: Boolean): Unit = {
128+
val indexes = RuleUtils
129+
.getCandidateIndexes(indexManager, plan, hybridScanEnabled)
130+
if (expectCandidateIndex) {
131+
assert(indexes.length == 1)
132+
assert(indexes.head.name == "index1")
133+
} else {
134+
assert(indexes.isEmpty)
135+
}
136+
}
137+
138+
// Verify that a candidate index is returned with the unmodified data files whether
139+
// hybrid scan is enabled or not.
140+
{
141+
val optimizedPlan = spark.read.parquet(dataPath).queryExecution.optimizedPlan
142+
verify(optimizedPlan, hybridScanEnabled = false, expectCandidateIndex = true)
143+
verify(optimizedPlan, hybridScanEnabled = true, expectCandidateIndex = true)
144+
}
145+
146+
// Scenario #1: Append new files.
147+
df.write.mode("append").parquet(dataPath)
148+
149+
{
150+
val optimizedPlan = spark.read.parquet(dataPath).queryExecution.optimizedPlan
151+
verify(optimizedPlan, hybridScanEnabled = false, expectCandidateIndex = false)
152+
verify(optimizedPlan, hybridScanEnabled = true, expectCandidateIndex = true)
153+
}
154+
155+
// Scenario #2: Delete 1 file.
156+
{
157+
val readDf = spark.read.parquet(dataPath)
158+
readDf.queryExecution.optimizedPlan foreach {
159+
case LogicalRelation(
160+
HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _),
161+
_,
162+
_,
163+
_) =>
164+
systemPath
165+
.getFileSystem(new Configuration)
166+
.delete(location.allFiles.head.getPath, false)
167+
case _ =>
168+
}
169+
}
170+
171+
{
172+
val optimizedPlan = spark.read.parquet(dataPath).queryExecution.optimizedPlan
173+
verify(optimizedPlan, hybridScanEnabled = false, expectCandidateIndex = false)
174+
verify(optimizedPlan, hybridScanEnabled = true, expectCandidateIndex = true)
175+
}
176+
177+
// Scenario #3: Replace all files.
178+
df.write.mode("overwrite").parquet(dataPath)
179+
180+
{
181+
val optimizedPlan = spark.read.parquet(dataPath).queryExecution.optimizedPlan
182+
verify(optimizedPlan, hybridScanEnabled = false, expectCandidateIndex = false)
183+
verify(optimizedPlan, hybridScanEnabled = true, expectCandidateIndex = false)
184+
}
185+
}
186+
}
187+
113188
private def validateLogicalRelation(plan: LogicalPlan, expected: LogicalRelation): Unit = {
114189
val r = RuleUtils.getLogicalRelation(plan)
115190
assert(r.isDefined)

0 commit comments

Comments
 (0)