Skip to content

Commit 6d13d00

Browse files
committed
Merge remote-tracking branch 'origin/master' into json-line-sep
# Conflicts: # python/pyspark/sql/tests.py # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
2 parents f99c1e1 + 4d37008 commit 6d13d00

File tree

118 files changed

+1913
-2015
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

118 files changed

+1913
-2015
lines changed

bin/docker-image-tool.sh

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,11 @@ function build {
6464
error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
6565
fi
6666

67+
local DOCKERFILE=${DOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
68+
6769
docker build "${BUILD_ARGS[@]}" \
6870
-t $(image_ref spark) \
69-
-f "$IMG_PATH/spark/Dockerfile" .
71+
-f "$DOCKERFILE" .
7072
}
7173

7274
function push {
@@ -84,6 +86,7 @@ Commands:
8486
push Push a pre-built image to a registry. Requires a repository address to be provided.
8587
8688
Options:
89+
-f file Dockerfile to build. By default builds the Dockerfile shipped with Spark.
8790
-r repo Repository address.
8891
-t tag Tag to apply to the built image, or to identify the image to be pushed.
8992
-m Use minikube's Docker daemon.
@@ -113,10 +116,12 @@ fi
113116

114117
REPO=
115118
TAG=
116-
while getopts mr:t: option
119+
DOCKERFILE=
120+
while getopts f:mr:t: option
117121
do
118122
case "${option}"
119123
in
124+
f) DOCKERFILE=${OPTARG};;
120125
r) REPO=${OPTARG};;
121126
t) TAG=${OPTARG};;
122127
m)

common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,43 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
5757
public Object getBaseObject() { return base; }
5858
public long getBaseOffset() { return offset; }
5959

60-
private static int[] bytesOfCodePointInUTF8 = {2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
61-
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
62-
3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
63-
4, 4, 4, 4, 4, 4, 4, 4,
64-
5, 5, 5, 5,
65-
6, 6};
60+
/**
61+
* A char in UTF-8 encoding can take 1-4 bytes depending on the first byte which
62+
* indicates the size of the char. See Unicode standard in page 126, Table 3-6:
63+
* http://www.unicode.org/versions/Unicode10.0.0/UnicodeStandard-10.0.pdf
64+
*
65+
* Binary Hex Comments
66+
* 0xxxxxxx 0x00..0x7F Only byte of a 1-byte character encoding
67+
* 10xxxxxx 0x80..0xBF Continuation bytes (1-3 continuation bytes)
68+
* 110xxxxx 0xC0..0xDF First byte of a 2-byte character encoding
69+
* 1110xxxx 0xE0..0xEF First byte of a 3-byte character encoding
70+
* 11110xxx 0xF0..0xF7 First byte of a 4-byte character encoding
71+
*
72+
* As a consequence of the well-formedness conditions specified in
73+
* Table 3-7 (page 126), the following byte values are disallowed in UTF-8:
74+
* C0–C1, F5–FF.
75+
*/
76+
private static byte[] bytesOfCodePointInUTF8 = {
77+
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 0x00..0x0F
78+
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 0x10..0x1F
79+
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 0x20..0x2F
80+
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 0x30..0x3F
81+
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 0x40..0x4F
82+
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 0x50..0x5F
83+
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 0x60..0x6F
84+
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 0x70..0x7F
85+
// Continuation bytes cannot appear as the first byte
86+
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 0x80..0x8F
87+
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 0x90..0x9F
88+
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 0xA0..0xAF
89+
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 0xB0..0xBF
90+
0, 0, // 0xC0..0xC1 - disallowed in UTF-8
91+
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, // 0xC2..0xCF
92+
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, // 0xD0..0xDF
93+
3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, // 0xE0..0xEF
94+
4, 4, 4, 4, 4, // 0xF0..0xF4
95+
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 // 0xF5..0xFF - disallowed in UTF-8
96+
};
6697

6798
private static final boolean IS_LITTLE_ENDIAN =
6899
ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
@@ -187,8 +218,9 @@ public void writeTo(OutputStream out) throws IOException {
187218
* @param b The first byte of a code point
188219
*/
189220
private static int numBytesForFirstByte(final byte b) {
190-
final int offset = (b & 0xFF) - 192;
191-
return (offset >= 0) ? bytesOfCodePointInUTF8[offset] : 1;
221+
final int offset = b & 0xFF;
222+
byte numBytes = bytesOfCodePointInUTF8[offset];
223+
return (numBytes == 0) ? 1: numBytes; // Skip the first byte disallowed in UTF-8
192224
}
193225

194226
/**

common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,12 @@ private static void checkBasic(String str, int len) {
5858
@Test
5959
public void basicTest() {
6060
checkBasic("", 0);
61-
checkBasic("hello", 5);
61+
checkBasic("¡", 1); // 2 bytes char
62+
checkBasic("ку", 2); // 2 * 2 bytes chars
63+
checkBasic("hello", 5); // 5 * 1 byte chars
6264
checkBasic("大 千 世 界", 7);
65+
checkBasic("︽﹋%", 3); // 3 * 3 bytes chars
66+
checkBasic("\uD83E\uDD19", 1); // 4 bytes char
6367
}
6468

6569
@Test
@@ -791,4 +795,21 @@ public void trimRightWithTrimString() {
791795
assertEquals(fromString("头"), fromString("头a???/").trimRight(fromString("数?/*&^%a")));
792796
assertEquals(fromString("头"), fromString("头数b数数 [").trimRight(fromString(" []数b")));
793797
}
798+
799+
@Test
800+
public void skipWrongFirstByte() {
801+
int[] wrongFirstBytes = {
802+
0x80, 0x9F, 0xBF, // Skip Continuation bytes
803+
0xC0, 0xC2, // 0xC0..0xC1 - disallowed in UTF-8
804+
// 0xF5..0xFF - disallowed in UTF-8
805+
0xF5, 0xF6, 0xF7, 0xF8, 0xF9,
806+
0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF
807+
};
808+
byte[] c = new byte[1];
809+
810+
for (int i = 0; i < wrongFirstBytes.length; ++i) {
811+
c[0] = (byte)wrongFirstBytes[i];
812+
assertEquals(fromBytes(c).numChars(), 1);
813+
}
814+
}
794815
}

core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818
package org.apache.spark.deploy
1919

2020
import java.io.File
21+
import java.net.URI
2122

2223
import org.apache.commons.lang3.StringUtils
2324
import org.apache.hadoop.conf.Configuration
2425
import org.apache.hadoop.fs.{FileSystem, Path}
2526

26-
import org.apache.spark.{SecurityManager, SparkConf}
27+
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
2728
import org.apache.spark.util.{MutableURLClassLoader, Utils}
2829

2930
private[deploy] object DependencyUtils {
@@ -137,16 +138,31 @@ private[deploy] object DependencyUtils {
137138
def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = {
138139
require(paths != null, "paths cannot be null.")
139140
Utils.stringToSeq(paths).flatMap { path =>
140-
val uri = Utils.resolveURI(path)
141-
uri.getScheme match {
142-
case "local" | "http" | "https" | "ftp" => Array(path)
143-
case _ =>
144-
val fs = FileSystem.get(uri, hadoopConf)
145-
Option(fs.globStatus(new Path(uri))).map { status =>
146-
status.filter(_.isFile).map(_.getPath.toUri.toString)
147-
}.getOrElse(Array(path))
141+
val (base, fragment) = splitOnFragment(path)
142+
(resolveGlobPath(base, hadoopConf), fragment) match {
143+
case (resolved, Some(_)) if resolved.length > 1 => throw new SparkException(
144+
s"${base.toString} resolves ambiguously to multiple files: ${resolved.mkString(",")}")
145+
case (resolved, Some(namedAs)) => resolved.map(_ + "#" + namedAs)
146+
case (resolved, _) => resolved
148147
}
149148
}.mkString(",")
150149
}
151150

151+
private def splitOnFragment(path: String): (URI, Option[String]) = {
152+
val uri = Utils.resolveURI(path)
153+
val withoutFragment = new URI(uri.getScheme, uri.getSchemeSpecificPart, null)
154+
(withoutFragment, Option(uri.getFragment))
155+
}
156+
157+
private def resolveGlobPath(uri: URI, hadoopConf: Configuration): Array[String] = {
158+
uri.getScheme match {
159+
case "local" | "http" | "https" | "ftp" => Array(uri.toString)
160+
case _ =>
161+
val fs = FileSystem.get(uri, hadoopConf)
162+
Option(fs.globStatus(new Path(uri))).map { status =>
163+
status.filter(_.isFile).map(_.getPath.toUri.toString)
164+
}.getOrElse(Array(uri.toString))
165+
}
166+
}
167+
152168
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,19 @@ object SparkSubmit extends CommandLineUtils with Logging {
245245
args: SparkSubmitArguments,
246246
conf: Option[HadoopConfiguration] = None)
247247
: (Seq[String], Seq[String], SparkConf, String) = {
248+
try {
249+
doPrepareSubmitEnvironment(args, conf)
250+
} catch {
251+
case e: SparkException =>
252+
printErrorAndExit(e.getMessage)
253+
throw e
254+
}
255+
}
256+
257+
private def doPrepareSubmitEnvironment(
258+
args: SparkSubmitArguments,
259+
conf: Option[HadoopConfiguration] = None)
260+
: (Seq[String], Seq[String], SparkConf, String) = {
248261
// Return values
249262
val childArgs = new ArrayBuffer[String]()
250263
val childClasspath = new ArrayBuffer[String]()
@@ -320,8 +333,6 @@ object SparkSubmit extends CommandLineUtils with Logging {
320333
printErrorAndExit("Python applications are currently not supported for Kubernetes.")
321334
case (KUBERNETES, _) if args.isR =>
322335
printErrorAndExit("R applications are currently not supported for Kubernetes.")
323-
case (KUBERNETES, CLIENT) =>
324-
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
325336
case (LOCAL, CLUSTER) =>
326337
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
327338
case (_, CLUSTER) if isShell(args.primaryResource) =>

core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ private[spark] object ShutdownHookManager extends Logging {
143143
}
144144

145145
/**
146-
* Adds a shutdown hook with the given priority. Hooks with lower priority values run
146+
* Adds a shutdown hook with the given priority. Hooks with higher priority values run
147147
* first.
148148
*
149149
* @param hook The code to run during shutdown.

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.deploy
2020
import java.io._
2121
import java.net.URI
2222
import java.nio.charset.StandardCharsets
23-
import java.nio.file.Files
23+
import java.nio.file.{Files, Paths}
2424

2525
import scala.collection.mutable
2626
import scala.collection.mutable.ArrayBuffer
@@ -606,10 +606,13 @@ class SparkSubmitSuite
606606
}
607607

608608
test("resolves command line argument paths correctly") {
609-
val jars = "/jar1,/jar2" // --jars
610-
val files = "local:/file1,file2" // --files
611-
val archives = "file:/archive1,archive2" // --archives
612-
val pyFiles = "py-file1,py-file2" // --py-files
609+
val dir = Utils.createTempDir()
610+
val archive = Paths.get(dir.toPath.toString, "single.zip")
611+
Files.createFile(archive)
612+
val jars = "/jar1,/jar2"
613+
val files = "local:/file1,file2"
614+
val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
615+
val pyFiles = "py-file1,py-file2"
613616

614617
// Test jars and files
615618
val clArgs = Seq(
@@ -636,9 +639,10 @@ class SparkSubmitSuite
636639
val appArgs2 = new SparkSubmitArguments(clArgs2)
637640
val (_, _, conf2, _) = SparkSubmit.prepareSubmitEnvironment(appArgs2)
638641
appArgs2.files should be (Utils.resolveURIs(files))
639-
appArgs2.archives should be (Utils.resolveURIs(archives))
642+
appArgs2.archives should fullyMatch regex ("file:/archive1,file:.*#archive3")
640643
conf2.get("spark.yarn.dist.files") should be (Utils.resolveURIs(files))
641-
conf2.get("spark.yarn.dist.archives") should be (Utils.resolveURIs(archives))
644+
conf2.get("spark.yarn.dist.archives") should fullyMatch regex
645+
("file:/archive1,file:.*#archive3")
642646

643647
// Test python files
644648
val clArgs3 = Seq(
@@ -657,6 +661,29 @@ class SparkSubmitSuite
657661
conf3.get(PYSPARK_PYTHON.key) should be ("python3.5")
658662
}
659663

664+
test("ambiguous archive mapping results in error message") {
665+
val dir = Utils.createTempDir()
666+
val archive1 = Paths.get(dir.toPath.toString, "first.zip")
667+
val archive2 = Paths.get(dir.toPath.toString, "second.zip")
668+
Files.createFile(archive1)
669+
Files.createFile(archive2)
670+
val jars = "/jar1,/jar2"
671+
val files = "local:/file1,file2"
672+
val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
673+
val pyFiles = "py-file1,py-file2"
674+
675+
// Test files and archives (Yarn)
676+
val clArgs2 = Seq(
677+
"--master", "yarn",
678+
"--class", "org.SomeClass",
679+
"--files", files,
680+
"--archives", archives,
681+
"thejar.jar"
682+
)
683+
684+
testPrematureExit(clArgs2.toArray, "resolves ambiguously to multiple files")
685+
}
686+
660687
test("resolves config paths correctly") {
661688
val jars = "/jar1,/jar2" // spark.jars
662689
val files = "local:/file1,file2" // spark.files / spark.yarn.dist.files

0 commit comments

Comments
 (0)