-
Notifications
You must be signed in to change notification settings - Fork 28.7k
Spark 615 map partitions with index callable from java #16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
215a9bf
e2331ed
0d624bf
4421ecc
8d849a1
958efa4
6ad1a3c
e64e1ad
8bfd3f3
b6a613f
36c7831
79d1bc1
ec80d7a
e4962ab
f484afc
4eb9c0f
96a86c7
df6922a
4fa42b0
2aa2fc9
cc4050d
ee6ebea
780120d
a5cd2fa
b231375
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -245,6 +245,24 @@ public void mapPartitions() { | |
Assert.assertEquals("[3, 7]", partitionSums.collect().toString()); | ||
} | ||
|
||
@Test | ||
public void mapPartitionsWithIndex() { | ||
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); | ||
JavaRDD<Integer> rddByIndex = rdd.mapPartitionsWithIndex((start, iter) -> { | ||
List<Integer> list = new ArrayList<Integer>(); | ||
int sum = 0; | ||
int pos = start; | ||
while (iter.hasNext()) { | ||
sum += (pos * iter.next()); | ||
pos += 1; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indents seem messed up here, should both be 2 spaces There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also space before |
||
} | ||
return list.iterator(); | ||
}); | ||
Assert.assertEquals(0, rddByIndex.first().intValue()); | ||
Integer[] values = {0, 2, 6, 12, 20}; | ||
Assert.assertEquals(Arrays.asList(values), rddByIndex.collect()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually this test is just broken as is, there's no |
||
} | ||
|
||
@Test | ||
public void sequenceFile() { | ||
File tempDir = Files.createTempDir(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering - why isn't this version callable from Java? It seems better to me not to define a new
Function
class... I'm trying to understand why the existing one can't be called as-is (modulo the classtag stuff).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it was just the classtag stuff. The new function class was for convience and in keeping with the other functions being defined (FlatMapFunction etc.). I can get of the new function class if you want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hey @holdenk now that I look more I'm all confused because it looks like
FlatMapFunction
is e.g. used for mapPartitions despite it's name. I'll have to defer to @mateiz on this one who has more recently looked at the java API.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree, did you try using the old one? It lacked a test but maybe it can work. Throughout the Java API we only pass the ClassTag for java.lang.Object, we don't try to pass a real class tag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried calling the old one, but that was pre the big Java API refactoring, and it didn't work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try updating this to just take a
Function2<Integer, java.util.Iterator<A>, java.util.Iterator<B>>
. I'm pretty sure it would work and we won't need a new type of class.