Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,25 @@ case class Content(root: Directory, fingerprint: NoOpFingerprint = NoOpFingerpri
@JsonIgnore
lazy val files: Seq[Path] = {
// Recursively find files from directory tree.
def rec(prefixPath: Path, directory: Directory): Seq[Path] = {
val files = directory.files.map(f => new Path(prefixPath, f.name))
files ++ directory.subDirs.flatMap { dir =>
rec(new Path(prefixPath, dir.name), dir)
}
}
rec(new Path(root.name), root, (f, prefix) => new Path(prefix, f.name))
}

rec(new Path(root.name), root)
@JsonIgnore
lazy val fileInfos: Set[FileInfo] = {
rec(
new Path(root.name),
root,
(f, prefix) => FileInfo(new Path(prefix, f.name).toString, f.size, f.modifiedTime)).toSet
}

private def rec[T](
prefixPath: Path,
directory: Directory,
func: (FileInfo, Path) => T): Seq[T] = {
val files = directory.files.map(f => func(f, prefixPath))
files ++ directory.subDirs.flatMap { dir =>
rec(new Path(prefixPath, dir.name), dir, func)
}
}
}

Expand Down Expand Up @@ -295,7 +306,18 @@ case class IndexLogEntry(

def created: Boolean = state.equals(Constants.States.ACTIVE)

def relations: Seq[Relation] = source.plan.properties.relations
def relations: Seq[Relation] = {
// Only one relation is currently supported.
assert(source.plan.properties.relations.size == 1)
source.plan.properties.relations
}

@JsonIgnore
lazy val allSourceFileInfos: Set[FileInfo] = {
relations
.flatMap(_.data.properties.content.fileInfos)
.toSet
}

override def equals(o: Any): Boolean = o match {
case that: IndexLogEntry =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package com.microsoft.hyperspace.index.rules
import scala.collection.mutable

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex}

import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.{IndexLogEntry, IndexManager, LogicalPlanSignatureProvider}
import com.microsoft.hyperspace.index.{FileInfo, IndexLogEntry, IndexManager, LogicalPlanSignatureProvider, PlanSignatureProvider}

object RuleUtils {

Expand All @@ -31,9 +31,13 @@ object RuleUtils {
*
* @param indexManager indexManager
* @param plan logical plan
* @param hybridScanEnabled Flag that checks if hybrid scan is enabled or disabled.
* @return indexes built for this plan
*/
def getCandidateIndexes(indexManager: IndexManager, plan: LogicalPlan): Seq[IndexLogEntry] = {
def getCandidateIndexes(
indexManager: IndexManager,
plan: LogicalPlan,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this plan is always the result of RuleUtils.getLogicalRelation meaning this plan is expected to be in a fixed shape (linear, etc.). What would be the best way to address this? cc: @apoorvedave1 / @pirz

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @rapoth I modified #160 a bit for this, but I think we need a new uber-issue for "arbitrary source plan" though it will be handled later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! Can you also open an uber issue for that when you get a chance please? (even if it is a work-in-progress issue, let's just capture all the work as we know so far)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I'll 👍

hybridScanEnabled: Boolean = false): Seq[IndexLogEntry] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we remove default value? Rationale: this is an user-facing config, which is set to false by default. If we later change the default to true, we need to remember to update this to true as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's temporary for this PR - I'll remove the default value assignment & use the config value instead in the next pr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. In that case, please leave a review comment on your intention. That helps reviewers.

// Map of a signature provider to a signature generated for the given plan.
val signatureMap = mutable.Map[String, Option[String]]()

Expand All @@ -51,11 +55,41 @@ object RuleUtils {
}
}

def isHybridScanCandidate(entry: IndexLogEntry, filesByRelations: Seq[FileInfo]): Boolean = {
// TODO: Some threshold about the similarity of source data files - number of common files or
// total size of common files.
// See https://github.com/microsoft/hyperspace/issues/159
// TODO: As in [[PlanSignatureProvider]], Source plan signature comparison is required to
// support arbitrary source plans at index creation.
// See https://github.com/microsoft/hyperspace/issues/158

// Find a common file between the input relation & index source files.
// Without the threshold described above, we can utilize exists & contain functions here.
filesByRelations.exists(entry.allSourceFileInfos.contains)
}

// TODO: the following check only considers indexes in ACTIVE state for usage. Update
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember there is already an issue for this. If so, can we just link to the issue containing the description. @apoorvedave1 to confirm if there is an issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems #65; I'll add the link here.

// the code to support indexes in transitioning states as well.
// See https://github.com/microsoft/hyperspace/issues/65
val allIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE))

allIndexes.filter(index => index.created && signatureValid(index))
if (hybridScanEnabled) {
val filesByRelations = plan
.collect {
case LogicalRelation(
HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _),
_,
_,
_) =>
location.allFiles.map(f =>
FileInfo(f.getPath.toString, f.getLen, f.getModificationTime))
}
assert(filesByRelations.length == 1)
allIndexes.filter(index =>
index.created && isHybridScanCandidate(index, filesByRelations.flatten))
} else {
allIndexes.filter(index => index.created && signatureValid(index))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.types.{StructField, StructType}
import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.Hdfs.Properties

trait HyperspaceRuleTestSuite extends HyperspaceSuite {
private val filenames = Seq("f1.parquet", "f2.parquet")
Expand All @@ -39,7 +40,13 @@ trait HyperspaceRuleTestSuite extends HyperspaceSuite {
LogicalPlanSignatureProvider.create(signClass).signature(plan) match {
case Some(s) =>
val sourcePlanProperties = SparkPlan.Properties(
Seq(),
Seq(
Relation(
Seq("dummy"),
Hdfs(Properties(Content(Directory("/")))),
"schema",
"format",
Map())),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: #142 (comment)

Copy link
Collaborator Author

@sezruby sezruby Sep 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required - empty root directory throws an exception during the test.

null,
null,
LogicalPlanFingerprint(LogicalPlanFingerprint.Properties(Seq(Signature(signClass, s)))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

package com.microsoft.hyperspace.index.rules

import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, IsNotNull}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation, NoopCache}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation, NoopCache, PartitioningAwareFileIndex}
import org.apache.spark.sql.types.{IntegerType, StringType}

import com.microsoft.hyperspace.index.IndexCollectionManager
import com.microsoft.hyperspace.index.{IndexCollectionManager, IndexConfig}
import com.microsoft.hyperspace.util.PathUtils

class RuleUtilsTest extends HyperspaceRuleTestSuite {
Expand Down Expand Up @@ -110,6 +111,80 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite {
assert(r.isEmpty)
}

test("Verify getCandidateIndex for hybrid scan") {
val indexManager = IndexCollectionManager(spark)
val df = spark.range(1, 5).toDF("id")
val dataPath = systemPath.toString + "/hbtable"
df.write.parquet(dataPath)

withIndex("index1") {
val readDf = spark.read.parquet(dataPath)
indexManager.create(readDf, IndexConfig("index1", Seq("id")))

def verify(
plan: LogicalPlan,
hybridScanEnabled: Boolean,
expectCandidateIndex: Boolean): Unit = {
val indexes = RuleUtils
.getCandidateIndexes(indexManager, plan, hybridScanEnabled)
if (expectCandidateIndex) {
assert(indexes.length == 1)
assert(indexes.head.name == "index1")
} else {
assert(indexes.isEmpty)
}
}

// Verify that a candidate index is returned with the unmodified data files whether
// hybrid scan is enabled or not.
{
val optimizedPlan = spark.read.parquet(dataPath).queryExecution.optimizedPlan
verify(optimizedPlan, hybridScanEnabled = false, expectCandidateIndex = true)
verify(optimizedPlan, hybridScanEnabled = true, expectCandidateIndex = true)
}

// Scenario #1: Append new files.
df.write.mode("append").parquet(dataPath)

{
val optimizedPlan = spark.read.parquet(dataPath).queryExecution.optimizedPlan
verify(optimizedPlan, hybridScanEnabled = false, expectCandidateIndex = false)
verify(optimizedPlan, hybridScanEnabled = true, expectCandidateIndex = true)
}

// Scenario #2: Delete 1 file.
{
val readDf = spark.read.parquet(dataPath)
readDf.queryExecution.optimizedPlan foreach {
case LogicalRelation(
HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _),
_,
_,
_) =>
systemPath
.getFileSystem(new Configuration)
.delete(location.allFiles.head.getPath, false)
case _ =>
}
}

{
val optimizedPlan = spark.read.parquet(dataPath).queryExecution.optimizedPlan
verify(optimizedPlan, hybridScanEnabled = false, expectCandidateIndex = false)
verify(optimizedPlan, hybridScanEnabled = true, expectCandidateIndex = true)
}

// Scenario #3: Replace all files.
df.write.mode("overwrite").parquet(dataPath)

{
val optimizedPlan = spark.read.parquet(dataPath).queryExecution.optimizedPlan
verify(optimizedPlan, hybridScanEnabled = false, expectCandidateIndex = false)
verify(optimizedPlan, hybridScanEnabled = true, expectCandidateIndex = false)
}
}
}

private def validateLogicalRelation(plan: LogicalPlan, expected: LogicalRelation): Unit = {
val r = RuleUtils.getLogicalRelation(plan)
assert(r.isDefined)
Expand Down