Skip to content

Commit

Permalink
Issue Qbeast-io#398: Qbeast Spark Refactor -- Delta abstraction (Qbea…
Browse files Browse the repository at this point in the history
…st-io#446)

* Issue Qbeast-io#417: Abstract the Qbeast Snapshot Module (Qbeast-io#411)

* Issue Qbeast-io#418: Abstract PreCommitHook and StagingDataManager (Qbeast-io#421)

* Issue Qbeast-io#418: Abstract RollupDataWriter and QbeastStats  (Qbeast-io#423)

* Issue Qbeast-io#419: Code Reorganization (Qbeast-io#425)

* Issue Qbeast-io#420: Create Separate Modules (Qbeast-io#427)

* Issue Qbeast-io#398: Fix small overhead added during the refactoring (Qbeast-io#436)

* Issue Qbeast-io#441: Fix data change on optimize (Qbeast-io#442)
  • Loading branch information
JosepSampe authored Oct 28, 2024
1 parent 2089b33 commit de19c2e
Show file tree
Hide file tree
Showing 229 changed files with 1,639 additions and 1,049 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-artifact.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
run: sbt "scalafixAll --check"
- name: Test
run: |
sbt coverage 'test' coverageReport
sbt coverage 'qbeastCore/test' 'qbeastDelta/test' 'qbeastSpark/test' coverageReport
- name: Upload to Codecov
run: |
curl https://keybase.io/codecovsecurity/pgp_keys.asc | gpg --no-default-keyring --keyring trustedkeys.gpg --import # One-time step
Expand Down
18 changes: 9 additions & 9 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ The following log levels are used to track code behaviour:
def writeWithTransaction(writer: => (TableChanges, Seq[FileAction])): Unit = {
// [...] Code to write the transaction [...]
if (txn.appId == appId && version <= txn.version) {
val message = s"Transaction ${version} from application ${appId} is already completed," +
val message = s"Transaction $version from application $appId is already completed," +
" the requested write is ignored"
logWarn(message)
return
Expand All @@ -131,7 +131,7 @@ The following log levels are used to track code behaviour:
if (isNewRevision(options)) {
// Merging revisions code
logDebug(
s"Merging transformations for table ${tableID} with cubeSize=${newRevisionCubeSize}")
s"Merging transformations for table $tableID with cubeSize=$newRevisionCubeSize")
// Code to merge revisions
}
```
Expand All @@ -144,9 +144,9 @@ The following log levels are used to track code behaviour:
indexStatus: IndexStatus,
options: QbeastOptions,
append: Boolean): Unit = {
logTrace(s"Begin: Writing data to table ${tableID}")
logTrace(s"Begin: Writing data to table $tableID")
// [...] Code to write the data [...]
logTrace(s"End: Writing data to table ${tableID}")
logTrace(s"End: Writing data to table $tableID")
}
```

Expand Down Expand Up @@ -191,10 +191,10 @@ For example:
sbt assembly

$SPARK_HOME/bin/spark-shell \
--jars ./target/scala-2.12/qbeast-spark-assembly-0.6.0.jar \
--jars ./target/scala-2.12/qbeast-spark-assembly-0.8.0-SNAPSHOT.jar \
--packages io.delta:delta-spark_2.12:3.1.0 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
--conf spark.sql.extensions=io.qbeast.sql.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.QbeastCatalog
```

### 4. Publishing artefacts in the local repository
Expand Down Expand Up @@ -280,8 +280,8 @@ To publish a new version of the qbeast-spark project, follow these steps:
export QBEAST_SPARK_VERSION=0.6.0-SNAPSHOT
$SPARK_350/bin/spark-shell --repositories https://s01.oss.sonatype.org/content/repositories/snapshots \
--packages io.delta:delta-spark_2.12:3.1.0,io.qbeast:qbeast-spark_2.12:$QBEAST_SPARK_VERSION \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
--conf spark.sql.extensions=io.qbeast.sql.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.QbeastCatalog

```
6. If everything is ok, change the `build.sbt` with the corresponding version and publish the RC.
Expand Down
44 changes: 38 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,49 @@ import Dependencies._

val mainVersion = "0.8.0-SNAPSHOT"

// Projects
lazy val qbeastCore = (project in file("./core"))
.settings(
name := "qbeast-core",
libraryDependencies ++= Seq(sparkCore % Provided, sparkSql % Provided, sparkml % Provided),
Test / parallelExecution := false,
assembly / test := {},
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false))
.settings(noWarningInConsole)

lazy val qbeastDelta = (project in file("./delta"))
.dependsOn(qbeastCore)
.settings(
name := "qbeast-delta",
libraryDependencies ++= Seq(sparkCore % Provided, deltaSpark % Provided, sparkSql % Provided),
Test / parallelExecution := false,
assembly / test := {},
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false))
.settings(noWarningInConsole)

lazy val qbeastSpark = (project in file("."))
.dependsOn(qbeastCore, qbeastDelta)
.enablePlugins(ScalaUnidocPlugin)
.settings(
name := "qbeast-spark",
libraryDependencies ++= Seq(
sparkCore % Provided,
sparkSql % Provided,
hadoopClient % Provided,
deltaSpark % Provided,
sparkml % Provided,
apacheCommons % Test,
amazonAws % Test,
hadoopCommons % Test,
sparkml % Test,
hadoopAws % Test),
Test / parallelExecution := false,
assembly / test := {},
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false))
.settings(noWarningInConsole)

qbeastCore / Compile / doc / scalacOptions ++= Seq(
"-doc-title",
"qbeast-core",
"-doc-version",
mainVersion,
"-doc-footer",
"Copyright 2022 Qbeast - Docs for version " + mainVersion + " of qbeast-core")

qbeastSpark / Compile / doc / scalacOptions ++= Seq(
"-doc-title",
"qbeast-spark",
Expand All @@ -31,6 +54,14 @@ qbeastSpark / Compile / doc / scalacOptions ++= Seq(
"-doc-footer",
"Copyright 2022 Qbeast - Docs for version " + mainVersion + " of qbeast-spark")

qbeastDelta / Compile / doc / scalacOptions ++= Seq(
"-doc-title",
"qbeast-delta",
"-doc-version",
mainVersion,
"-doc-footer",
"Copyright 2022 Qbeast - Docs for version " + mainVersion + " of qbeast-delta")

// Common metadata
ThisBuild / version := mainVersion
ThisBuild / organization := "io.qbeast"
Expand All @@ -45,6 +76,7 @@ ThisBuild / libraryDependencies ++= Seq(
mockito % Test)

Test / javaOptions ++= Seq("-Xmx10G", "-XX:+UseG1GC")
Test / testOptions += Tests.Argument("-oD")
Test / fork := true

// Scala compiler settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
*/
package io.qbeast.core.model

import io.qbeast.spark.model.CubeState
import io.qbeast.spark.model.CubeState.CubeStateValue
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession

import CubeState.CubeStateValue

/**
* Container for the table changes
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
*/
package io.qbeast.core.model

import org.apache.spark.sql.DataFrame

/**
* ColumnsToIndexSelector interface to automatically select which columns to index.
* @tparam DATA
* the data to index
*/
trait ColumnsToIndexSelector[DATA] {
trait ColumnsToIndexSelector {

/**
* The maximum number of columns to index.
Expand All @@ -34,7 +34,7 @@ trait ColumnsToIndexSelector[DATA] {
* the data to index
* @return
*/
def selectColumnsToIndex(data: DATA): Seq[String] =
def selectColumnsToIndex(data: DataFrame): Seq[String] =
selectColumnsToIndex(data, MAX_COLUMNS_TO_INDEX)

/**
Expand All @@ -46,6 +46,6 @@ trait ColumnsToIndexSelector[DATA] {
* @return
* A sequence with the names of the columns to index
*/
def selectColumnsToIndex(data: DATA, numColumnsToIndex: Int): Seq[String]
def selectColumnsToIndex(data: DataFrame, numColumnsToIndex: Int): Seq[String]

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.spark.model
package io.qbeast.core.model

/**
* Names of possible states of the cube
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,13 @@
package io.qbeast.core.model

import io.qbeast.IISeq
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.DataFrame

/**
* Data Writer template
* @tparam DATA
* type of data
* @tparam DataSchema
* type of the data schema
* @tparam FileDescriptor
* type of file descriptor
*/
trait DataWriter[DATA, DataSchema, FileDescriptor] {
trait DataWriter {

/**
* Write the index data to the files
Expand All @@ -43,8 +39,8 @@ trait DataWriter[DATA, DataSchema, FileDescriptor] {
*/
def write(
tableID: QTableID,
schema: DataSchema,
data: DATA,
tableChanges: TableChanges): IISeq[FileDescriptor]
schema: StructType,
data: DataFrame,
tableChanges: TableChanges): IISeq[IndexFile]

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import scala.collection.immutable
final class IndexFileBuilder {
private var path: Option[String] = None
private var size: Long = 0L
private var dataChange: Boolean = true
private var modificationTime: Long = 0L
private var revisionId: RevisionID = 0L
private val blocks = immutable.Seq.newBuilder[VolatileBlock]
private var stats: Option[QbeastStats] = None

/**
* Sets the path.
Expand Down Expand Up @@ -53,6 +55,32 @@ final class IndexFileBuilder {
this
}

/**
* Sets the stats.
*
* @param stats
* the stats
* @return
* this instance
*/
def setStats(stats: Option[QbeastStats]): IndexFileBuilder = {
this.stats = stats
this
}

/**
* Sets the data change.
*
* @param dataChange
* whether this index file represents a data change
* @return
* this instance
*/
def setDataChange(dataChange: Boolean): IndexFileBuilder = {
this.dataChange = dataChange
this
}

/**
* Sets the modification time
*
Expand Down Expand Up @@ -103,9 +131,11 @@ final class IndexFileBuilder {
IndexFile(
filePath,
size,
dataChange,
modificationTime,
revisionId,
blocks.result().map(_.toBlock(filePath)))
blocks.result().map(_.toBlock(filePath)),
stats)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
package io.qbeast.core.model

import io.qbeast.IISeq
import org.apache.spark.sql.DataFrame

/**
* Index Manager template
* @tparam DATA
* type of data to index
*/
trait IndexManager[DATA] {
trait IndexManager {

/**
* Indexes the data
Expand All @@ -33,7 +32,7 @@ trait IndexManager[DATA] {
* @return
* the changes of the index and reorganization of data
*/
def index(data: DATA, indexStatus: IndexStatus): (DATA, TableChanges)
def index(data: DataFrame, indexStatus: IndexStatus): (DataFrame, TableChanges)

/**
* Optimizes the index
Expand All @@ -44,7 +43,7 @@ trait IndexManager[DATA] {
* @return
* the changes on the index and reorganization of data
*/
def optimize(data: DATA, indexStatus: IndexStatus): (DATA, TableChanges)
def optimize(data: DataFrame, indexStatus: IndexStatus): (DataFrame, TableChanges)

/**
* Analyzes the current index status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,14 @@
*/
package io.qbeast.core.model

import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.IISeq
import org.apache.spark.sql.types.StructType

/**
* Metadata Manager template
* @tparam DataSchema
* type of data schema
* @tparam FileDescriptor
* type of file descriptor
* @tparam QbeastOptions
* type of the Qbeast options
*/
trait MetadataManager[DataSchema, FileDescriptor, QbeastOptions] {
trait MetadataManager {
type Configuration = Map[String, String]

/**
Expand All @@ -45,7 +41,7 @@ trait MetadataManager[DataSchema, FileDescriptor, QbeastOptions] {
* @return
* the current schema
*/
def loadCurrentSchema(tableID: QTableID): DataSchema
def loadCurrentSchema(tableID: QTableID): StructType

/**
* Writes and updates the metadata by using transaction control
Expand All @@ -60,9 +56,9 @@ trait MetadataManager[DataSchema, FileDescriptor, QbeastOptions] {
*/
def updateWithTransaction(
tableID: QTableID,
schema: DataSchema,
schema: StructType,
options: QbeastOptions,
append: Boolean)(writer: => (TableChanges, IISeq[FileDescriptor])): Unit
append: Boolean)(writer: => (TableChanges, IISeq[IndexFile], IISeq[DeleteFile])): Unit

/**
* Updates the table metadata by overwriting the metadata configurations with the provided
Expand All @@ -74,7 +70,7 @@ trait MetadataManager[DataSchema, FileDescriptor, QbeastOptions] {
* @param update
* configurations used to overwrite the existing metadata
*/
def updateMetadataWithTransaction(tableID: QTableID, schema: DataSchema)(
def updateMetadataWithTransaction(tableID: QTableID, schema: StructType)(
update: => Configuration): Unit

/**
Expand Down Expand Up @@ -127,6 +123,7 @@ trait MetadataManager[DataSchema, FileDescriptor, QbeastOptions] {
/**
* Creates an initial log directory
* @param tableID
* table ID
*/
def createLog(tableID: QTableID): Unit

Expand Down
Loading

0 comments on commit de19c2e

Please sign in to comment.