Skip to content

Commit 0030ba0

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into sql-local-tests-cleanup
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
2 parents a93a260 + 09b7e7c commit 0030ba0

File tree

85 files changed

+1774
-287
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+1774
-287
lines changed

R/pkg/DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Package: SparkR
22
Type: Package
33
Title: R frontend for Spark
4-
Version: 1.5.0
4+
Version: 1.6.0
55
Date: 2013-09-09
66
Author: The Apache Software Foundation
77
Maintainer: Shivaram Venkataraman <shivaram@cs.berkeley.edu>

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent_2.10</artifactId>
24-
<version>1.5.0-SNAPSHOT</version>
24+
<version>1.6.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

bagel/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent_2.10</artifactId>
24-
<version>1.5.0-SNAPSHOT</version>
24+
<version>1.6.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent_2.10</artifactId>
24-
<version>1.5.0-SNAPSHOT</version>
24+
<version>1.6.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
/**
21+
* Holds statistics about the output sizes in a map stage. May become a DeveloperApi in the future.
22+
*
23+
* @param shuffleId ID of the shuffle
24+
* @param bytesByPartitionId approximate number of output bytes for each map output partition
25+
* (may be inexact due to use of compressed map statuses)
26+
*/
27+
private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark
1919

2020
import java.io._
21+
import java.util.Arrays
2122
import java.util.concurrent.ConcurrentHashMap
2223
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
2324

@@ -132,13 +133,43 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
132133
* describing the shuffle blocks that are stored at that block manager.
133134
*/
134135
def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
135-
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
136+
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
136137
logDebug(s"Fetching outputs for shuffle $shuffleId, reduce $reduceId")
137-
val startTime = System.currentTimeMillis
138+
val statuses = getStatuses(shuffleId)
139+
// Synchronize on the returned array because, on the driver, it gets mutated in place
140+
statuses.synchronized {
141+
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
142+
}
143+
}
138144

145+
/**
146+
* Return statistics about all of the outputs for a given shuffle.
147+
*/
148+
def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
149+
val statuses = getStatuses(dep.shuffleId)
150+
// Synchronize on the returned array because, on the driver, it gets mutated in place
151+
statuses.synchronized {
152+
val totalSizes = new Array[Long](dep.partitioner.numPartitions)
153+
for (s <- statuses) {
154+
for (i <- 0 until totalSizes.length) {
155+
totalSizes(i) += s.getSizeForBlock(i)
156+
}
157+
}
158+
new MapOutputStatistics(dep.shuffleId, totalSizes)
159+
}
160+
}
161+
162+
/**
163+
* Get or fetch the array of MapStatuses for a given shuffle ID. NOTE: clients MUST synchronize
164+
* on this array when reading it, because on the driver, we may be changing it in place.
165+
*
166+
* (It would be nice to remove this restriction in the future.)
167+
*/
168+
private def getStatuses(shuffleId: Int): Array[MapStatus] = {
139169
val statuses = mapStatuses.get(shuffleId).orNull
140170
if (statuses == null) {
141171
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
172+
val startTime = System.currentTimeMillis
142173
var fetchedStatuses: Array[MapStatus] = null
143174
fetching.synchronized {
144175
// Someone else is fetching it; wait for them to be done
@@ -160,7 +191,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
160191
}
161192

162193
if (fetchedStatuses == null) {
163-
// We won the race to fetch the output locs; do so
194+
// We won the race to fetch the statuses; do so
164195
logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
165196
// This try-finally prevents hangs due to timeouts:
166197
try {
@@ -175,22 +206,18 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
175206
}
176207
}
177208
}
178-
logDebug(s"Fetching map output location for shuffle $shuffleId, reduce $reduceId took " +
209+
logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
179210
s"${System.currentTimeMillis - startTime} ms")
180211

181212
if (fetchedStatuses != null) {
182-
fetchedStatuses.synchronized {
183-
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
184-
}
213+
return fetchedStatuses
185214
} else {
186215
logError("Missing all output locations for shuffle " + shuffleId)
187216
throw new MetadataFetchFailedException(
188-
shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
217+
shuffleId, -1, "Missing all output locations for shuffle " + shuffleId)
189218
}
190219
} else {
191-
statuses.synchronized {
192-
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
193-
}
220+
return statuses
194221
}
195222
}
196223

0 commit comments

Comments
 (0)