Skip to content

Commit

Permalink
Changes to pass table identifier in CC
Browse files Browse the repository at this point in the history
  • Loading branch information
prakharjain09 committed Aug 27, 2024
1 parent a33c191 commit 52447f6
Show file tree
Hide file tree
Showing 19 changed files with 216 additions and 240 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.*;

/**
* A commit coordinator client that uses DynamoDB as the commit coordinator. The table schema is as follows:
Expand Down Expand Up @@ -350,11 +347,11 @@ DynamoDBTableEntryConstants.TABLE_LATEST_TIMESTAMP, new AttributeValueUpdate()
public CommitResponse commit(
LogStore logStore,
Configuration hadoopConf,
Path logPath,
Map<String, String> coordinatedCommitsTableConf,
TableDescriptor tableDesc,
long commitVersion,
Iterator<String> actions,
UpdatedActions updatedActions) throws CommitFailedException {
Path logPath = tableDesc.getLogPath();
if (commitVersion == 0) {
throw new CommitFailedException(
false /* retryable */,
Expand All @@ -375,7 +372,7 @@ public CommitResponse commit(
commitVersion, commitPath);
CommitResponse res = commitToCoordinator(
logPath,
coordinatedCommitsTableConf,
tableDesc.getTableConf(),
commitVersion,
commitFileStatus,
inCommitTimestamp,
Expand All @@ -393,8 +390,7 @@ public CommitResponse commit(
backfillToVersion(
logStore,
hadoopConf,
logPath,
coordinatedCommitsTableConf,
tableDesc,
commitVersion,
null /* lastKnownBackfilledVersion */);
}
Expand Down Expand Up @@ -456,13 +452,12 @@ private GetCommitsResultInternal getCommitsImpl(

@Override
public GetCommitsResponse getCommits(
Path logPath,
Map<String, String> coordinatedCommitsTableConf,
TableDescriptor tableDesc,
Long startVersion,
Long endVersion) {
try {
GetCommitsResultInternal res =
getCommitsImpl(logPath, coordinatedCommitsTableConf, startVersion, endVersion);
getCommitsImpl(tableDesc.getLogPath(), tableDesc.getTableConf(), startVersion, endVersion);
long latestTableVersionToReturn = res.response.getLatestTableVersion();
if (!res.hasAcceptedCommits) {
/*
Expand Down Expand Up @@ -533,16 +528,16 @@ private void validateBackfilledFileExists(
public void backfillToVersion(
LogStore logStore,
Configuration hadoopConf,
Path logPath,
Map<String, String> coordinatedCommitsTableConf,
TableDescriptor tableDesc,
long version,
Long lastKnownBackfilledVersion) throws IOException {
LOG.info("Backfilling all unbackfilled commits.");
Path logPath = tableDesc.getLogPath();
GetCommitsResponse resp;
try {
resp = getCommitsImpl(
logPath,
coordinatedCommitsTableConf,
tableDesc.getTableConf(),
lastKnownBackfilledVersion,
null).response;
} catch (IOException e) {
Expand Down Expand Up @@ -582,7 +577,7 @@ public void backfillToVersion(
.withTableName(coordinatedCommitsTableName)
.addKeyEntry(
DynamoDBTableEntryConstants.TABLE_ID,
new AttributeValue().withS(getTableId(coordinatedCommitsTableConf)))
new AttributeValue().withS(getTableId(tableDesc.getTableConf())))
.addAttributeUpdatesEntry(
DynamoDBTableEntryConstants.COMMITS,
new AttributeValueUpdate()
Expand Down Expand Up @@ -624,6 +619,7 @@ public void backfillToVersion(
@Override
public Map<String, String> registerTable(
Path logPath,
Optional<TableIdentifier> tableIdentifier,
long currentVersion,
AbstractMetadata currentMetadata,
AbstractProtocol currentProtocol) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.delta

// scalastyle:off import.ordering.noEmptyLine
import java.nio.file.FileAlreadyExistsException
import java.util.{ConcurrentModificationException, UUID}
import java.util.{ConcurrentModificationException, Optional, UUID}
import java.util.concurrent.TimeUnit.NANOSECONDS

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -1707,7 +1707,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
log"[commit-coordinator: ${MDC(DeltaLogKeys.COORDINATOR_NAME, commitCoordinatorName)}" +
log", conf: ${MDC(DeltaLogKeys.COORDINATOR_CONF, commitCoordinatorConf)}]")
newCoordinatedCommitsTableConf = Some(newCommitCoordinatorClient.registerTable(
deltaLog.logPath, readVersion, finalMetadata, protocol).asScala.toMap)
deltaLog.logPath, Optional.empty(), readVersion, finalMetadata, protocol).asScala.toMap)
case (None, Some(readCommitCoordinatorClient)) =>
// CC -> FS conversion
val (newOwnerName, newOwnerConf) =
Expand Down Expand Up @@ -2277,11 +2277,11 @@ trait OptimisticTransactionImpl extends TransactionalWrite
override def commit(
logStore: io.delta.storage.LogStore,
hadoopConf: Configuration,
logPath: Path,
coordinatedCommitsTableConf: java.util.Map[String, String],
tableDesc: TableDescriptor,
commitVersion: Long,
actions: java.util.Iterator[String],
updatedActions: UpdatedActions): CommitResponse = {
val logPath = tableDesc.getLogPath
// Get thread local observer for Fuzz testing purpose.
val executionObserver = TransactionExecutionObserver.getObserver
val commitFile = util.FileNames.unsafeDeltaFile(logPath, commitVersion)
Expand Down Expand Up @@ -2315,17 +2315,15 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}

override def getCommits(
logPath: Path,
coordinatedCommitsTableConf: java.util.Map[String, String],
tableDesc: TableDescriptor,
startVersion: java.lang.Long,
endVersion: java.lang.Long): GetCommitsResponse =
new GetCommitsResponse(Seq.empty.asJava, -1)

override def backfillToVersion(
logStore: io.delta.storage.LogStore,
hadoopConf: Configuration,
logPath: Path,
coordinatedCommitsTableConf: java.util.Map[String, String],
tableDesc: TableDescriptor,
version: Long,
lastKnownBackfilledVersion: java.lang.Long): Unit = {}

Expand All @@ -2344,6 +2342,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite

override def registerTable(
logPath: Path,
tableIdentifier: Optional[TableIdentifier],
currentVersion: Long,
currentMetadata: AbstractMetadata,
currentProtocol: AbstractProtocol): java.util.Map[String, String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.delta.coordinatedcommits

import java.nio.file.FileAlreadyExistsException
import java.util.UUID
import java.util.{Optional, UUID}

import scala.collection.JavaConverters._

Expand All @@ -28,7 +28,7 @@ import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.util.FileNames
import io.delta.storage.LogStore
import io.delta.storage.commit.{CommitCoordinatorClient, CommitFailedException => JCommitFailedException, CommitResponse, UpdatedActions}
import io.delta.storage.commit.{CommitCoordinatorClient, CommitFailedException => JCommitFailedException, CommitResponse, TableDescriptor, TableIdentifier, UpdatedActions}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}

Expand Down Expand Up @@ -64,11 +64,11 @@ trait AbstractBatchBackfillingCommitCoordinatorClient
override def commit(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
coordinatedCommitsTableConf: java.util.Map[String, String],
tableDesc: TableDescriptor,
commitVersion: Long,
actions: java.util.Iterator[String],
updatedActions: UpdatedActions): CommitResponse = {
val logPath = tableDesc.getLogPath
val executionObserver = TransactionExecutionObserver.getObserver
val tablePath = CoordinatedCommitsUtils.getTablePath(logPath)
if (commitVersion == 0) {
Expand All @@ -86,8 +86,7 @@ trait AbstractBatchBackfillingCommitCoordinatorClient
backfillToVersion(
logStore,
hadoopConf,
logPath,
coordinatedCommitsTableConf,
tableDesc,
commitVersion - 1,
null)
}
Expand All @@ -103,7 +102,7 @@ trait AbstractBatchBackfillingCommitCoordinatorClient
logStore,
hadoopConf,
logPath,
coordinatedCommitsTableConf.asScala.toMap,
tableDesc.getTableConf.asScala.toMap,
commitVersion,
fileStatus,
commitTimestamp)
Expand All @@ -122,13 +121,7 @@ trait AbstractBatchBackfillingCommitCoordinatorClient
logInfo(log"Making sure commits are backfilled till " +
log"${MDC(DeltaLogKeys.VERSION, commitVersion)} " +
log"version for table ${MDC(DeltaLogKeys.PATH, tablePath.toString)}")
backfillToVersion(
logStore,
hadoopConf,
logPath,
coordinatedCommitsTableConf,
commitVersion,
null)
backfillToVersion(logStore, hadoopConf, tableDesc, commitVersion, null)
}
logInfo(log"Commit ${MDC(DeltaLogKeys.VERSION, commitVersion)} done successfully on table " +
log"${MDC(DeltaLogKeys.PATH, tablePath)}")
Expand All @@ -150,18 +143,18 @@ trait AbstractBatchBackfillingCommitCoordinatorClient
override def backfillToVersion(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
coordinatedCommitsTableConf: java.util.Map[String, String],
tableDesc: TableDescriptor,
version: Long,
lastKnownBackfilledVersionOpt: java.lang.Long): Unit = {
val logPath = tableDesc.getLogPath
// Confirm the last backfilled version by checking the backfilled delta file's existence.
val validLastKnownBackfilledVersionOpt = Option(lastKnownBackfilledVersionOpt)
.filter { version =>
val fs = logPath.getFileSystem(hadoopConf)
fs.exists(FileNames.unsafeDeltaFile(logPath, version))
}
val startVersionOpt: Long = validLastKnownBackfilledVersionOpt.map(_ + 1).map(Long.box).orNull
getCommits(logPath, coordinatedCommitsTableConf, startVersionOpt, version)
getCommits(tableDesc, startVersionOpt, version)
.getCommits.asScala
.foreach { commit =>
backfill(logStore, hadoopConf, logPath, commit.getVersion, commit.getFileStatus)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.apache.spark.sql.delta.coordinatedcommits

import java.util.{Map => JMap, Optional}
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.ReentrantReadWriteLock

Expand All @@ -24,13 +25,7 @@ import scala.collection.mutable

import org.apache.spark.sql.delta.logging.DeltaLogKeys
import io.delta.storage.LogStore
import io.delta.storage.commit.{
Commit => JCommit,
CommitCoordinatorClient,
CommitFailedException => JCommitFailedException,
CommitResponse,
GetCommitsResponse => JGetCommitsResponse
}
import io.delta.storage.commit.{Commit => JCommit, CommitCoordinatorClient, CommitFailedException => JCommitFailedException, CommitResponse, GetCommitsResponse => JGetCommitsResponse, TableDescriptor, TableIdentifier}
import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
Expand Down Expand Up @@ -150,14 +145,13 @@ class InMemoryCommitCoordinator(val batchSize: Long)
}

override def getCommits(
logPath: Path,
coordinatedCommitsTableConf: java.util.Map[String, String],
tableDesc: TableDescriptor,
startVersion: java.lang.Long,
endVersion: java.lang.Long): JGetCommitsResponse = {
withReadLock[JGetCommitsResponse](logPath) {
withReadLock[JGetCommitsResponse](tableDesc.getLogPath) {
val startVersionOpt: Option[Long] = Option(startVersion).map(_.toLong)
val endVersionOpt: Option[Long] = Option(endVersion).map(_.toLong)
val tableData = perTableMap.get(logPath)
val tableData = perTableMap.get(tableDesc.getLogPath)
val effectiveStartVersion = startVersionOpt.getOrElse(0L)
// Calculate the end version for the range, or use the last key if endVersion is not provided
val effectiveEndVersion = endVersionOpt.getOrElse(
Expand Down Expand Up @@ -187,9 +181,10 @@ class InMemoryCommitCoordinator(val batchSize: Long)

override def registerTable(
logPath: Path,
tableIdentifier: Optional[TableIdentifier],
currentVersion: Long,
currentMetadata: AbstractMetadata,
currentProtocol: AbstractProtocol): java.util.Map[String, String] = {
currentProtocol: AbstractProtocol): JMap[String, String] = {
val newPerTableData = new PerTableData(currentVersion + 1)
perTableMap.compute(logPath, (_, existingData) => {
if (existingData != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@

package org.apache.spark.sql.delta.coordinatedcommits

import java.util.Optional

import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.storage.{LogStore, LogStoreInverseAdaptor}
import io.delta.storage.commit.{
CommitCoordinatorClient => JCommitCoordinatorClient,
CommitResponse,
GetCommitsResponse => JGetCommitsResponse,
UpdatedActions
}
import io.delta.storage.commit.{CommitCoordinatorClient => JCommitCoordinatorClient, CommitResponse, GetCommitsResponse => JGetCommitsResponse, TableDescriptor, UpdatedActions}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

Expand All @@ -48,15 +45,16 @@ case class TableCommitCoordinatorClient(
hadoopConf: Configuration,
logStore: LogStore) {

val tableDesc = new TableDescriptor(logPath, Optional.empty(), tableConf.asJava)

def commit(
commitVersion: Long,
actions: Iterator[String],
updatedActions: UpdatedActions): CommitResponse = {
commitCoordinatorClient.commit(
LogStoreInverseAdaptor(logStore, hadoopConf),
hadoopConf,
logPath,
tableConf.asJava,
tableDesc,
commitVersion,
actions.asJava,
updatedActions)
Expand All @@ -66,7 +64,9 @@ case class TableCommitCoordinatorClient(
startVersion: Option[Long] = None,
endVersion: Option[Long] = None): JGetCommitsResponse = {
commitCoordinatorClient.getCommits(
logPath, tableConf.asJava, startVersion.map(Long.box).orNull, endVersion.map(Long.box).orNull)
tableDesc,
startVersion.map(Long.box).orNull,
endVersion.map(Long.box).orNull)
}

def backfillToVersion(
Expand All @@ -75,8 +75,7 @@ case class TableCommitCoordinatorClient(
commitCoordinatorClient.backfillToVersion(
LogStoreInverseAdaptor(logStore, hadoopConf),
hadoopConf,
logPath,
tableConf.asJava,
tableDesc,
version,
lastKnownBackfilledVersion.map(Long.box).orNull)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.delta

import java.io.{BufferedReader, File, InputStreamReader, IOException}
import java.nio.charset.StandardCharsets
import java.util.Locale
import java.util.{Locale, Optional}

import scala.collection.JavaConverters._
import scala.language.postfixOps
Expand All @@ -34,6 +34,7 @@ import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import io.delta.storage.commit.TableDescriptor
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.FsPermission

Expand Down Expand Up @@ -506,8 +507,9 @@ class DeltaLogSuite extends QueryTest
// file.
val oc = CommitCoordinatorProvider.getCommitCoordinatorClient(
"tracking-in-memory", Map.empty[String, String], spark)
val commitResponse = oc.getCommits(
deltaLog.logPath, Map.empty[String, String].asJava, 2, null)
val tableDesc =
new TableDescriptor(deltaLog.logPath, Optional.empty(), Map.empty[String, String].asJava)
val commitResponse = oc.getCommits(tableDesc, 2, null)
if (!commitResponse.getCommits.isEmpty) {
val path = commitResponse.getCommits.asScala.last.getFileStatus.getPath
fs.delete(path, true)
Expand Down Expand Up @@ -619,8 +621,9 @@ class DeltaLogSuite extends QueryTest
// file.
val oc = CommitCoordinatorProvider.getCommitCoordinatorClient(
"tracking-in-memory", Map.empty[String, String], spark)
val commitResponse = oc.getCommits(
log.logPath, Map.empty[String, String].asJava, 1, null)
val tableDesc =
new TableDescriptor(log.logPath, Optional.empty(), Map.empty[String, String].asJava)
val commitResponse = oc.getCommits(tableDesc, 1, null)
if (!commitResponse.getCommits.isEmpty) {
commitFilePath = commitResponse.getCommits.asScala.head.getFileStatus.getPath
}
Expand Down
Loading

0 comments on commit 52447f6

Please sign in to comment.