Skip to content

Commit 6fd4c00

Browse files
committed
Removing potentially dangerous method
1 parent 7fbd7b4 commit 6fd4c00

File tree

2 files changed

+7
-29
lines changed

2 files changed

+7
-29
lines changed

core/src/main/scala/org/apache/spark/util/ThreadUtils.scala

Lines changed: 4 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -284,36 +284,12 @@ private[spark] object ThreadUtils {
284284
try {
285285
implicit val ec = ExecutionContext.fromExecutor(pool)
286286

287-
parmap(in)(f)
287+
val futures = in.map(x => Future(f(x)))
288+
val futureSeq = Future.sequence(futures)
289+
290+
awaitResult(futureSeq, Duration.Inf)
288291
} finally {
289292
pool.shutdownNow()
290293
}
291294
}
292-
293-
/**
294-
* Transforms input collection by applying the given function to each element in parallel fashion.
295-
* Comparing to the map() method of Scala parallel collections, this method can be interrupted
296-
* at any time. This is useful on canceling of task execution, for example.
297-
*
298-
* @param in - the input collection which should be transformed in parallel.
299-
* @param f - the lambda function will be applied to each element of `in`.
300-
* @param ec - an execution context for parallel applying of the given function `f`.
301-
* @tparam I - the type of elements in the input collection.
302-
* @tparam O - the type of elements in resulted collection.
303-
* @return new collection in which each element was given from the input collection `in` by
304-
* applying the lambda function `f`.
305-
*/
306-
def parmap[I, O, Col[X] <: TraversableLike[X, Col[X]]]
307-
(in: Col[I])
308-
(f: I => O)
309-
(implicit
310-
cbf: CanBuildFrom[Col[I], Future[O], Col[Future[O]]], // For in.map
311-
cbf2: CanBuildFrom[Col[Future[O]], O, Col[O]], // for Future.sequence
312-
ec: ExecutionContext
313-
): Col[O] = {
314-
val futures = in.map(x => Future(f(x)))
315-
val futureSeq = Future.sequence(futures)
316-
317-
awaitResult(futureSeq, Duration.Inf)
318-
}
319295
}

streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,9 @@ private[streaming] object FileBasedWriteAheadLog {
315315
implicit val ec = executionContext
316316

317317
source.grouped(groupSize).flatMap { group =>
318-
ThreadUtils.parmap(group)(handler)
318+
val parallelCollection = group.par
319+
parallelCollection.tasksupport = taskSupport
320+
parallelCollection.map(handler)
319321
}.flatten
320322
}
321323
}

0 commit comments

Comments
 (0)