Skip to content

Commit 5329091

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into fix_conf
Conflicts: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
2 parents 63f2972 + 46c6341 commit 5329091

File tree

120 files changed

+1401
-578
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

120 files changed

+1401
-578
lines changed

LICENSE

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -712,18 +712,6 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
712712
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
713713
EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
714714

715-
========================================================================
716-
For colt:
717-
========================================================================
718-
719-
Copyright (c) 1999 CERN - European Organization for Nuclear Research.
720-
Permission to use, copy, modify, distribute and sell this software and its documentation for any purpose is hereby granted without fee, provided that the above copyright notice appear in all copies and that both that copyright notice and this permission notice appear in supporting documentation. CERN makes no representations about the suitability of this software for any purpose. It is provided "as is" without expressed or implied warranty.
721-
722-
Packages hep.aida.*
723-
724-
Written by Pavel Binko, Dino Ferrero Merlino, Wolfgang Hoschek, Tony Johnson, Andreas Pfeiffer, and others. Check the FreeHEP home page for more info. Permission to use and/or redistribute this work is granted under the terms of the LGPL License, with the exception that any usage related to military applications is expressly forbidden. The software and documentation made available under the terms of this license are provided with no warranty.
725-
726-
727715
========================================================================
728716
For SnapTree:
729717
========================================================================

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ storage systems. Because the protocols have changed in different versions of
8484
Hadoop, you must build Spark against the same version that your cluster runs.
8585

8686
Please refer to the build documentation at
87-
["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version)
87+
["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-with-maven.html#specifying-the-hadoop-version)
8888
for detailed guidance on building for a particular distribution of Hadoop, including
8989
building for particular Hive and Hive Thriftserver distributions. See also
9090
["Third Party Hadoop Distributions"](http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html)

assembly/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@
146146
<exclude>com/google/common/base/Present*</exclude>
147147
</excludes>
148148
</relocation>
149+
<relocation>
150+
<pattern>org.apache.commons.math3</pattern>
151+
<shadedPattern>org.spark-project.commons.math3</shadedPattern>
152+
</relocation>
149153
</relocations>
150154
<transformers>
151155
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />

core/pom.xml

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,6 @@
8585
<dependency>
8686
<groupId>org.apache.commons</groupId>
8787
<artifactId>commons-math3</artifactId>
88-
<version>3.3</version>
89-
<scope>test</scope>
9088
</dependency>
9189
<dependency>
9290
<groupId>com.google.code.findbugs</groupId>
@@ -162,10 +160,6 @@
162160
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
163161
<version>3.2.10</version>
164162
</dependency>
165-
<dependency>
166-
<groupId>colt</groupId>
167-
<artifactId>colt</artifactId>
168-
</dependency>
169163
<dependency>
170164
<groupId>org.apache.mesos</groupId>
171165
<artifactId>mesos</artifactId>
@@ -247,6 +241,11 @@
247241
</exclusion>
248242
</exclusions>
249243
</dependency>
244+
<dependency>
245+
<groupId>org.seleniumhq.selenium</groupId>
246+
<artifactId>selenium-java</artifactId>
247+
<scope>test</scope>
248+
</dependency>
250249
<dependency>
251250
<groupId>org.scalatest</groupId>
252251
<artifactId>scalatest_${scala.binary.version}</artifactId>

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import java.util.jar.{JarEntry, JarOutputStream}
2323

2424
import scala.collection.JavaConversions._
2525

26+
import com.google.common.io.{ByteStreams, Files}
2627
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
27-
import com.google.common.io.Files
2828

2929
import org.apache.spark.util.Utils
3030

@@ -64,12 +64,7 @@ private[spark] object TestUtils {
6464
jarStream.putNextEntry(jarEntry)
6565

6666
val in = new FileInputStream(file)
67-
val buffer = new Array[Byte](10240)
68-
var nRead = 0
69-
while (nRead <= 0) {
70-
nRead = in.read(buffer, 0, buffer.length)
71-
jarStream.write(buffer, 0, nRead)
72-
}
67+
ByteStreams.copy(in, jarStream)
7368
in.close()
7469
}
7570
jarStream.close()

core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.broadcast
2020
import java.io.Serializable
2121

2222
import org.apache.spark.SparkException
23+
import org.apache.spark.Logging
24+
import org.apache.spark.util.Utils
2325

2426
import scala.reflect.ClassTag
2527

@@ -52,14 +54,16 @@ import scala.reflect.ClassTag
5254
* @param id A unique identifier for the broadcast variable.
5355
* @tparam T Type of the data contained in the broadcast variable.
5456
*/
55-
abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
57+
abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Logging {
5658

5759
/**
5860
* Flag signifying whether the broadcast variable is valid
5961
* (that is, not already destroyed) or not.
6062
*/
6163
@volatile private var _isValid = true
6264

65+
private var _destroySite = ""
66+
6367
/** Get the broadcasted value. */
6468
def value: T = {
6569
assertValid()
@@ -84,13 +88,26 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
8488
doUnpersist(blocking)
8589
}
8690

91+
92+
/**
93+
* Destroy all data and metadata related to this broadcast variable. Use this with caution;
94+
* once a broadcast variable has been destroyed, it cannot be used again.
95+
* This method blocks until destroy has completed
96+
*/
97+
def destroy() {
98+
destroy(blocking = true)
99+
}
100+
87101
/**
88102
* Destroy all data and metadata related to this broadcast variable. Use this with caution;
89103
* once a broadcast variable has been destroyed, it cannot be used again.
104+
* @param blocking Whether to block until destroy has completed
90105
*/
91106
private[spark] def destroy(blocking: Boolean) {
92107
assertValid()
93108
_isValid = false
109+
_destroySite = Utils.getCallSite().shortForm
110+
logInfo("Destroying %s (from %s)".format(toString, _destroySite))
94111
doDestroy(blocking)
95112
}
96113

@@ -124,7 +141,8 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
124141
/** Check if this broadcast is valid. If not valid, exception is thrown. */
125142
protected def assertValid() {
126143
if (!_isValid) {
127-
throw new SparkException("Attempted to use %s after it has been destroyed!".format(toString))
144+
throw new SparkException(
145+
"Attempted to use %s after it was destroyed (%s) ".format(toString, _destroySite))
128146
}
129147
}
130148

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@ package org.apache.spark.deploy
2020
import java.security.PrivilegedExceptionAction
2121

2222
import org.apache.hadoop.conf.Configuration
23+
import org.apache.hadoop.fs.{FileSystem, Path}
24+
import org.apache.hadoop.fs.FileSystem.Statistics
2325
import org.apache.hadoop.mapred.JobConf
2426
import org.apache.hadoop.security.Credentials
2527
import org.apache.hadoop.security.UserGroupInformation
2628

2729
import org.apache.spark.{Logging, SparkContext, SparkConf, SparkException}
2830
import org.apache.spark.annotation.DeveloperApi
31+
import org.apache.spark.util.Utils
2932

3033
import scala.collection.JavaConversions._
3134

@@ -121,6 +124,33 @@ class SparkHadoopUtil extends Logging {
121124
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
122125
}
123126

127+
/**
128+
* Returns a function that can be called to find Hadoop FileSystem bytes read. If
129+
* getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
130+
* return the bytes read on r since t. Reflection is required because thread-level FileSystem
131+
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
132+
* Returns None if the required method can't be found.
133+
*/
134+
private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration)
135+
: Option[() => Long] = {
136+
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
137+
val scheme = qualifiedPath.toUri().getScheme()
138+
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
139+
try {
140+
val threadStats = stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
141+
val statisticsDataClass =
142+
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
143+
val getBytesReadMethod = statisticsDataClass.getDeclaredMethod("getBytesRead")
144+
val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
145+
val baselineBytesRead = f()
146+
Some(() => f() - baselineBytesRead)
147+
} catch {
148+
case e: NoSuchMethodException => {
149+
logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e)
150+
None
151+
}
152+
}
153+
}
124154
}
125155

126156
object SparkHadoopUtil {

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,6 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
169169
var bytesRead: Long = 0L
170170
}
171171

172-
173172
/**
174173
* :: DeveloperApi ::
175174
* Metrics pertaining to shuffle data read in a given task.

core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,13 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
8181
// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
8282
if (length < MIN_MEMORY_MAP_BYTES) {
8383
val buf = ByteBuffer.allocate(length.toInt)
84-
channel.read(buf, offset)
84+
channel.position(offset)
85+
while (buf.remaining() != 0) {
86+
if (channel.read(buf) == -1) {
87+
throw new IOException("Reached EOF before filling buffer\n" +
88+
s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
89+
}
90+
}
8591
buf.flip()
8692
buf
8793
} else {
@@ -106,7 +112,7 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
106112
var is: FileInputStream = null
107113
try {
108114
is = new FileInputStream(file)
109-
is.skip(offset)
115+
ByteStreams.skipFully(is, offset)
110116
ByteStreams.limit(is, length)
111117
} catch {
112118
case e: IOException =>

core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -95,16 +95,21 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
9595
future.onSuccess { case message =>
9696
val bufferMessage = message.asInstanceOf[BufferMessage]
9797
val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
98-
99-
for (blockMessage <- blockMessageArray) {
100-
if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) {
101-
listener.onBlockFetchFailure(
102-
new SparkException(s"Unexpected message ${blockMessage.getType} received from $cmId"))
103-
} else {
104-
val blockId = blockMessage.getId
105-
val networkSize = blockMessage.getData.limit()
106-
listener.onBlockFetchSuccess(
107-
blockId.toString, new NioByteBufferManagedBuffer(blockMessage.getData))
98+
// SPARK-4064: In some cases(eg. Remote block was removed) blockMessageArray may be empty.
99+
if (blockMessageArray.isEmpty) {
100+
listener.onBlockFetchFailure(
101+
new SparkException(s"Received empty message from $cmId"))
102+
} else {
103+
for (blockMessage <- blockMessageArray) {
104+
val msgType = blockMessage.getType
105+
if (msgType != BlockMessage.TYPE_GOT_BLOCK) {
106+
listener.onBlockFetchFailure(
107+
new SparkException(s"Unexpected message ${msgType} received from $cmId"))
108+
} else {
109+
val blockId = blockMessage.getId
110+
listener.onBlockFetchSuccess(
111+
blockId.toString, new NioByteBufferManagedBuffer(blockMessage.getData))
112+
}
108113
}
109114
}
110115
}(cm.futureExecContext)

0 commit comments

Comments
 (0)