Skip to content

Commit c0f1021

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into progress_api
2 parents 023afb3 + 68cb69d commit c0f1021

File tree

275 files changed

+9746
-2698
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

275 files changed

+9746
-2698
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ storage systems. Because the protocols have changed in different versions of
8484
Hadoop, you must build Spark against the same version that your cluster runs.
8585

8686
Please refer to the build documentation at
87-
["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version)
87+
["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-with-maven.html#specifying-the-hadoop-version)
8888
for detailed guidance on building for a particular distribution of Hadoop, including
8989
building for particular Hive and Hive Thriftserver distributions. See also
9090
["Third Party Hadoop Distributions"](http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html)

bin/pyspark2.cmd

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,12 @@ for /f %%i in ('echo %1^| findstr /R "\.py"') do (
5959
)
6060

6161
if [%PYTHON_FILE%] == [] (
62-
%PYSPARK_PYTHON%
62+
set PYSPARK_SHELL=1
63+
if [%IPYTHON%] == [1] (
64+
ipython %IPYTHON_OPTS%
65+
) else (
66+
%PYSPARK_PYTHON%
67+
)
6368
) else (
6469
echo.
6570
echo WARNING: Running python applications through ./bin/pyspark.cmd is deprecated as of Spark 1.0.

bin/spark-class

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,11 @@ case "$1" in
8181
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS"
8282
OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
8383
if [ -n "$SPARK_SUBMIT_LIBRARY_PATH" ]; then
84-
OUR_JAVA_OPTS="$OUR_JAVA_OPTS -Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH"
84+
if [[ $OSTYPE == darwin* ]]; then
85+
export DYLD_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:$DYLD_LIBRARY_PATH"
86+
else
87+
export LD_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:$LD_LIBRARY_PATH"
88+
fi
8589
fi
8690
if [ -n "$SPARK_SUBMIT_DRIVER_MEMORY" ]; then
8791
OUR_JAVA_MEM="$SPARK_SUBMIT_DRIVER_MEMORY"

core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@
4444
</exclusion>
4545
</exclusions>
4646
</dependency>
47+
<dependency>
48+
<groupId>org.apache.spark</groupId>
49+
<artifactId>network</artifactId>
50+
<version>${project.version}</version>
51+
</dependency>
4752
<dependency>
4853
<groupId>net.java.dev.jets3t</groupId>
4954
<artifactId>jets3t</artifactId>

core/src/main/java/org/apache/spark/util/collection/Sorter.java renamed to core/src/main/java/org/apache/spark/util/collection/TimSort.java

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,25 @@
2020
import java.util.Comparator;
2121

2222
/**
23-
* A port of the Android Timsort class, which utilizes a "stable, adaptive, iterative mergesort."
23+
* A port of the Android TimSort class, which utilizes a "stable, adaptive, iterative mergesort."
2424
* See the method comment on sort() for more details.
2525
*
2626
* This has been kept in Java with the original style in order to match very closely with the
27-
* Anroid source code, and thus be easy to verify correctness.
27+
* Android source code, and thus be easy to verify correctness. The class is package private. We put
28+
* a simple Scala wrapper {@link org.apache.spark.util.collection.Sorter}, which is available to
29+
* package org.apache.spark.
2830
*
2931
* The purpose of the port is to generalize the interface to the sort to accept input data formats
3032
* besides simple arrays where every element is sorted individually. For instance, the AppendOnlyMap
3133
* uses this to sort an Array with alternating elements of the form [key, value, key, value].
3234
* This generalization comes with minimal overhead -- see SortDataFormat for more information.
35+
*
36+
* We allow key reuse to prevent creating many key objects -- see SortDataFormat.
37+
*
38+
* @see org.apache.spark.util.collection.SortDataFormat
39+
* @see org.apache.spark.util.collection.Sorter
3340
*/
34-
class Sorter<K, Buffer> {
41+
class TimSort<K, Buffer> {
3542

3643
/**
3744
* This is the minimum sized sequence that will be merged. Shorter
@@ -54,7 +61,7 @@ class Sorter<K, Buffer> {
5461

5562
private final SortDataFormat<K, Buffer> s;
5663

57-
public Sorter(SortDataFormat<K, Buffer> sortDataFormat) {
64+
public TimSort(SortDataFormat<K, Buffer> sortDataFormat) {
5865
this.s = sortDataFormat;
5966
}
6067

@@ -91,7 +98,7 @@ public Sorter(SortDataFormat<K, Buffer> sortDataFormat) {
9198
*
9299
* @author Josh Bloch
93100
*/
94-
void sort(Buffer a, int lo, int hi, Comparator<? super K> c) {
101+
public void sort(Buffer a, int lo, int hi, Comparator<? super K> c) {
95102
assert c != null;
96103

97104
int nRemaining = hi - lo;
@@ -162,10 +169,13 @@ private void binarySort(Buffer a, int lo, int hi, int start, Comparator<? super
162169
if (start == lo)
163170
start++;
164171

172+
K key0 = s.newKey();
173+
K key1 = s.newKey();
174+
165175
Buffer pivotStore = s.allocate(1);
166176
for ( ; start < hi; start++) {
167177
s.copyElement(a, start, pivotStore, 0);
168-
K pivot = s.getKey(pivotStore, 0);
178+
K pivot = s.getKey(pivotStore, 0, key0);
169179

170180
// Set left (and right) to the index where a[start] (pivot) belongs
171181
int left = lo;
@@ -178,7 +188,7 @@ private void binarySort(Buffer a, int lo, int hi, int start, Comparator<? super
178188
*/
179189
while (left < right) {
180190
int mid = (left + right) >>> 1;
181-
if (c.compare(pivot, s.getKey(a, mid)) < 0)
191+
if (c.compare(pivot, s.getKey(a, mid, key1)) < 0)
182192
right = mid;
183193
else
184194
left = mid + 1;
@@ -235,13 +245,16 @@ private int countRunAndMakeAscending(Buffer a, int lo, int hi, Comparator<? supe
235245
if (runHi == hi)
236246
return 1;
237247

248+
K key0 = s.newKey();
249+
K key1 = s.newKey();
250+
238251
// Find end of run, and reverse range if descending
239-
if (c.compare(s.getKey(a, runHi++), s.getKey(a, lo)) < 0) { // Descending
240-
while (runHi < hi && c.compare(s.getKey(a, runHi), s.getKey(a, runHi - 1)) < 0)
252+
if (c.compare(s.getKey(a, runHi++, key0), s.getKey(a, lo, key1)) < 0) { // Descending
253+
while (runHi < hi && c.compare(s.getKey(a, runHi, key0), s.getKey(a, runHi - 1, key1)) < 0)
241254
runHi++;
242255
reverseRange(a, lo, runHi);
243256
} else { // Ascending
244-
while (runHi < hi && c.compare(s.getKey(a, runHi), s.getKey(a, runHi - 1)) >= 0)
257+
while (runHi < hi && c.compare(s.getKey(a, runHi, key0), s.getKey(a, runHi - 1, key1)) >= 0)
245258
runHi++;
246259
}
247260

@@ -468,11 +481,13 @@ private void mergeAt(int i) {
468481
}
469482
stackSize--;
470483

484+
K key0 = s.newKey();
485+
471486
/*
472487
* Find where the first element of run2 goes in run1. Prior elements
473488
* in run1 can be ignored (because they're already in place).
474489
*/
475-
int k = gallopRight(s.getKey(a, base2), a, base1, len1, 0, c);
490+
int k = gallopRight(s.getKey(a, base2, key0), a, base1, len1, 0, c);
476491
assert k >= 0;
477492
base1 += k;
478493
len1 -= k;
@@ -483,7 +498,7 @@ private void mergeAt(int i) {
483498
* Find where the last element of run1 goes in run2. Subsequent elements
484499
* in run2 can be ignored (because they're already in place).
485500
*/
486-
len2 = gallopLeft(s.getKey(a, base1 + len1 - 1), a, base2, len2, len2 - 1, c);
501+
len2 = gallopLeft(s.getKey(a, base1 + len1 - 1, key0), a, base2, len2, len2 - 1, c);
487502
assert len2 >= 0;
488503
if (len2 == 0)
489504
return;
@@ -517,10 +532,12 @@ private int gallopLeft(K key, Buffer a, int base, int len, int hint, Comparator<
517532
assert len > 0 && hint >= 0 && hint < len;
518533
int lastOfs = 0;
519534
int ofs = 1;
520-
if (c.compare(key, s.getKey(a, base + hint)) > 0) {
535+
K key0 = s.newKey();
536+
537+
if (c.compare(key, s.getKey(a, base + hint, key0)) > 0) {
521538
// Gallop right until a[base+hint+lastOfs] < key <= a[base+hint+ofs]
522539
int maxOfs = len - hint;
523-
while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint + ofs)) > 0) {
540+
while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint + ofs, key0)) > 0) {
524541
lastOfs = ofs;
525542
ofs = (ofs << 1) + 1;
526543
if (ofs <= 0) // int overflow
@@ -535,7 +552,7 @@ private int gallopLeft(K key, Buffer a, int base, int len, int hint, Comparator<
535552
} else { // key <= a[base + hint]
536553
// Gallop left until a[base+hint-ofs] < key <= a[base+hint-lastOfs]
537554
final int maxOfs = hint + 1;
538-
while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint - ofs)) <= 0) {
555+
while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint - ofs, key0)) <= 0) {
539556
lastOfs = ofs;
540557
ofs = (ofs << 1) + 1;
541558
if (ofs <= 0) // int overflow
@@ -560,7 +577,7 @@ private int gallopLeft(K key, Buffer a, int base, int len, int hint, Comparator<
560577
while (lastOfs < ofs) {
561578
int m = lastOfs + ((ofs - lastOfs) >>> 1);
562579

563-
if (c.compare(key, s.getKey(a, base + m)) > 0)
580+
if (c.compare(key, s.getKey(a, base + m, key0)) > 0)
564581
lastOfs = m + 1; // a[base + m] < key
565582
else
566583
ofs = m; // key <= a[base + m]
@@ -587,10 +604,12 @@ private int gallopRight(K key, Buffer a, int base, int len, int hint, Comparator
587604

588605
int ofs = 1;
589606
int lastOfs = 0;
590-
if (c.compare(key, s.getKey(a, base + hint)) < 0) {
607+
K key1 = s.newKey();
608+
609+
if (c.compare(key, s.getKey(a, base + hint, key1)) < 0) {
591610
// Gallop left until a[b+hint - ofs] <= key < a[b+hint - lastOfs]
592611
int maxOfs = hint + 1;
593-
while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint - ofs)) < 0) {
612+
while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint - ofs, key1)) < 0) {
594613
lastOfs = ofs;
595614
ofs = (ofs << 1) + 1;
596615
if (ofs <= 0) // int overflow
@@ -606,7 +625,7 @@ private int gallopRight(K key, Buffer a, int base, int len, int hint, Comparator
606625
} else { // a[b + hint] <= key
607626
// Gallop right until a[b+hint + lastOfs] <= key < a[b+hint + ofs]
608627
int maxOfs = len - hint;
609-
while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint + ofs)) >= 0) {
628+
while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint + ofs, key1)) >= 0) {
610629
lastOfs = ofs;
611630
ofs = (ofs << 1) + 1;
612631
if (ofs <= 0) // int overflow
@@ -630,7 +649,7 @@ private int gallopRight(K key, Buffer a, int base, int len, int hint, Comparator
630649
while (lastOfs < ofs) {
631650
int m = lastOfs + ((ofs - lastOfs) >>> 1);
632651

633-
if (c.compare(key, s.getKey(a, base + m)) < 0)
652+
if (c.compare(key, s.getKey(a, base + m, key1)) < 0)
634653
ofs = m; // key < a[b + m]
635654
else
636655
lastOfs = m + 1; // a[b + m] <= key
@@ -679,6 +698,9 @@ private void mergeLo(int base1, int len1, int base2, int len2) {
679698
return;
680699
}
681700

701+
K key0 = s.newKey();
702+
K key1 = s.newKey();
703+
682704
Comparator<? super K> c = this.c; // Use local variable for performance
683705
int minGallop = this.minGallop; // " " " " "
684706
outer:
@@ -692,7 +714,7 @@ private void mergeLo(int base1, int len1, int base2, int len2) {
692714
*/
693715
do {
694716
assert len1 > 1 && len2 > 0;
695-
if (c.compare(s.getKey(a, cursor2), s.getKey(tmp, cursor1)) < 0) {
717+
if (c.compare(s.getKey(a, cursor2, key0), s.getKey(tmp, cursor1, key1)) < 0) {
696718
s.copyElement(a, cursor2++, a, dest++);
697719
count2++;
698720
count1 = 0;
@@ -714,7 +736,7 @@ private void mergeLo(int base1, int len1, int base2, int len2) {
714736
*/
715737
do {
716738
assert len1 > 1 && len2 > 0;
717-
count1 = gallopRight(s.getKey(a, cursor2), tmp, cursor1, len1, 0, c);
739+
count1 = gallopRight(s.getKey(a, cursor2, key0), tmp, cursor1, len1, 0, c);
718740
if (count1 != 0) {
719741
s.copyRange(tmp, cursor1, a, dest, count1);
720742
dest += count1;
@@ -727,7 +749,7 @@ private void mergeLo(int base1, int len1, int base2, int len2) {
727749
if (--len2 == 0)
728750
break outer;
729751

730-
count2 = gallopLeft(s.getKey(tmp, cursor1), a, cursor2, len2, 0, c);
752+
count2 = gallopLeft(s.getKey(tmp, cursor1, key0), a, cursor2, len2, 0, c);
731753
if (count2 != 0) {
732754
s.copyRange(a, cursor2, a, dest, count2);
733755
dest += count2;
@@ -784,6 +806,9 @@ private void mergeHi(int base1, int len1, int base2, int len2) {
784806
int cursor2 = len2 - 1; // Indexes into tmp array
785807
int dest = base2 + len2 - 1; // Indexes into a
786808

809+
K key0 = s.newKey();
810+
K key1 = s.newKey();
811+
787812
// Move last element of first run and deal with degenerate cases
788813
s.copyElement(a, cursor1--, a, dest--);
789814
if (--len1 == 0) {
@@ -811,7 +836,7 @@ private void mergeHi(int base1, int len1, int base2, int len2) {
811836
*/
812837
do {
813838
assert len1 > 0 && len2 > 1;
814-
if (c.compare(s.getKey(tmp, cursor2), s.getKey(a, cursor1)) < 0) {
839+
if (c.compare(s.getKey(tmp, cursor2, key0), s.getKey(a, cursor1, key1)) < 0) {
815840
s.copyElement(a, cursor1--, a, dest--);
816841
count1++;
817842
count2 = 0;
@@ -833,7 +858,7 @@ private void mergeHi(int base1, int len1, int base2, int len2) {
833858
*/
834859
do {
835860
assert len1 > 0 && len2 > 1;
836-
count1 = len1 - gallopRight(s.getKey(tmp, cursor2), a, base1, len1, len1 - 1, c);
861+
count1 = len1 - gallopRight(s.getKey(tmp, cursor2, key0), a, base1, len1, len1 - 1, c);
837862
if (count1 != 0) {
838863
dest -= count1;
839864
cursor1 -= count1;
@@ -846,7 +871,7 @@ private void mergeHi(int base1, int len1, int base2, int len2) {
846871
if (--len2 == 1)
847872
break outer;
848873

849-
count2 = len2 - gallopLeft(s.getKey(a, cursor1), tmp, 0, len2, len2 - 1, c);
874+
count2 = len2 - gallopLeft(s.getKey(a, cursor1, key0), tmp, 0, len2, len2 - 1, c);
850875
if (count2 != 0) {
851876
dest -= count2;
852877
cursor2 -= count2;

0 commit comments

Comments
 (0)