Skip to content

Commit 26249f1

Browse files
ericlzsxwing
authored andcommitted
[SC-5631] Fix scala 2.10 build and use thread-pool util
## What changes were proposed in this pull request? Fix scala 2.10 build and use thread-pool util ## How was this patch tested? Built manually. Author: Eric Liang <ekl@databricks.com> Closes apache#176 from ericl/fix-build-2.
1 parent a3f4c53 commit 26249f1

File tree

4 files changed

+8
-21
lines changed

4 files changed

+8
-21
lines changed

sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocol.scala

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.SparkEnv
2323
import org.apache.spark.internal.Logging
2424
import org.apache.spark.internal.io.FileCommitProtocol
2525
import org.apache.spark.sql.SparkSession
26+
import org.apache.spark.util.ThreadUtils
2627

2728
/**
2829
* File commit protocol optimized for cloud storage. Files are written directly to their final
@@ -159,24 +160,9 @@ object DatabricksAtomicCommitProtocol extends Logging {
159160
private val sparkSession = SparkSession.builder.getOrCreate()
160161

161162
import scala.collection.parallel.ThreadPoolTaskSupport
162-
import java.util.concurrent.{LinkedBlockingQueue, ThreadFactory, ThreadPoolExecutor, TimeUnit}
163-
164-
private lazy val tasksupport = new ThreadPoolTaskSupport({
165-
val pool = new ThreadPoolExecutor(
166-
100,
167-
100,
168-
100L,
169-
TimeUnit.SECONDS,
170-
new LinkedBlockingQueue[Runnable])
171-
pool.setThreadFactory(new ThreadFactory {
172-
override def newThread(task: Runnable): Thread = {
173-
val thread = new Thread(task, "DatabricksAtomicCommitProtocolWorker")
174-
thread.setDaemon(true)
175-
thread
176-
}
177-
})
178-
pool
179-
})
163+
164+
private lazy val tasksupport = new ThreadPoolTaskSupport(
165+
ThreadUtils.newDaemonCachedThreadPool("db-atomic-commit-worker", 100))
180166

181167
/**
182168
* Traverses the given directories and cleans up uncommitted or garbage files and markers. A

sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicReadProtocol.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
package org.apache.spark.sql.transaction
1010

11-
import java.io._
11+
import java.io.{File, InputStream, InputStreamReader, IOException, OutputStream}
1212
import java.nio.charset.StandardCharsets
1313

1414
import scala.collection.mutable

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,8 @@ object PartitioningAwareFileIndex extends Logging {
402402
}
403403

404404
val allLeafStatuses = {
405-
val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
405+
val (dirs: Array[FileStatus], topLevelFiles: Array[FileStatus]) =
406+
statuses.partition(_.isDirectory)
406407
val nestedFiles: Seq[FileStatus] = sessionOpt match {
407408
case Some(session) =>
408409
bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)

sql/core/src/test/scala/com/databricks/sql/acl/InMemoryAclClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class InMemoryAclClient(var underlyingPrincipal: Principal) extends AclClient {
3030
principal: Option[Principal] = None,
3131
securable: Securable): Seq[Permission] = {
3232
allPermissions.filter { perm =>
33-
(principal.isEmpty || principal.contains(perm.principal)) && (securable == perm.securable)
33+
(principal.isEmpty || principal == Some(perm.principal)) && (securable == perm.securable)
3434
}.toSeq
3535
}
3636

0 commit comments

Comments
 (0)