Skip to content

[SPARK-2871] [PySpark] add zipWithIndex() and zipWithUniqueId() #2092

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Aug 22, 2014

RDD.zipWithIndex()

    Zips this RDD with its element indices.

    The ordering is first based on the partition index and then the
    ordering of items within each partition. So the first item in
    the first partition gets index 0, and the last item in the last
    partition receives the largest index.

    This method needs to trigger a spark job when this RDD contains
    more than one partitions.

    >>> sc.parallelize(range(4), 2).zipWithIndex().collect()
    [(0, 0), (1, 1), (2, 2), (3, 3)]

RDD.zipWithUniqueId()

    Zips this RDD with generated unique Long ids.

    Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
    n is the number of partitions. So there may exist gaps, but this
    method won't trigger a spark job, which is different from
    L{zipWithIndex}

    >>> sc.parallelize(range(4), 2).zipWithUniqueId().collect()
    [(0, 0), (2, 1), (1, 2), (3, 3)]

@SparkQA
Copy link

SparkQA commented Aug 22, 2014

QA tests have started for PR 2092 at commit 0d2a128.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 22, 2014

QA tests have finished for PR 2092 at commit 0d2a128.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mattf
Copy link

mattf commented Aug 23, 2014

are you going to add tests for these?

@davies
Copy link
Contributor Author

davies commented Aug 23, 2014

I think doc tests should be enough.

@mattf
Copy link

mattf commented Aug 23, 2014

fair enough

+1 lgtm

more than one partitions.

>>> sc.parallelize(range(4), 2).zipWithIndex().collect()
[(0, 0), (1, 1), (2, 2), (3, 3)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the best example because it's not clear which element is the item and which element is its index. In the Scala API, this is clear from the method's return type. Maybe we should update the documentation to explicitly state that the second element is the id (like the Scala API).

I think this implementation has things backwards w.r.t. the Scala one:

>>> sc.parallelize(['a', 'b', 'c', 'd'], 2).zipWithIndex().collect()
[(0, 'a'), (1, 'b'), (2, 'c'), (3, 'd')]

versus

scala> sc.parallelize(Seq('a', 'b', 'c', 'd')).zipWithIndex().collect()
res0: Array[(Char, Long)] = Array((a,0), (b,1), (c,2), (d,3))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change it.

@davies
Copy link
Contributor Author

davies commented Aug 24, 2014

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Aug 24, 2014

QA tests have started for PR 2092 at commit cebe5bf.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 24, 2014

QA tests have finished for PR 2092 at commit cebe5bf.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class BoundedFloat(float):
    • class JoinedRow2 extends Row
    • class JoinedRow3 extends Row
    • class JoinedRow4 extends Row
    • class JoinedRow5 extends Row
    • class GenericRow(protected[sql] val values: Array[Any]) extends Row
    • abstract class MutableValue extends Serializable
    • final class MutableInt extends MutableValue
    • final class MutableFloat extends MutableValue
    • final class MutableBoolean extends MutableValue
    • final class MutableDouble extends MutableValue
    • final class MutableShort extends MutableValue
    • final class MutableLong extends MutableValue
    • final class MutableByte extends MutableValue
    • final class MutableAny extends MutableValue
    • final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow
    • case class CountDistinct(expressions: Seq[Expression]) extends PartialAggregate
    • case class CollectHashSet(expressions: Seq[Expression]) extends AggregateExpression
    • case class CollectHashSetFunction(
    • case class CombineSetsAndCount(inputSet: Expression) extends AggregateExpression
    • case class CombineSetsAndCountFunction(
    • case class CountDistinctFunction(
    • case class MaxOf(left: Expression, right: Expression) extends Expression
    • case class NewSet(elementType: DataType) extends LeafExpression
    • case class AddItemToSet(item: Expression, set: Expression) extends Expression
    • case class CombineSets(left: Expression, right: Expression) extends BinaryExpression
    • case class CountSet(child: Expression) extends UnaryExpression

@JoshRosen
Copy link
Contributor

LGTM, so I've merged this into master. Thanks!

@asfgit asfgit closed this in fb0db77 Aug 25, 2014
@JoshRosen
Copy link
Contributor

I also merged this into branch-1.1, since it's an often-requested feature and only adds code to this file (so it's low-risk to merge).

asfgit pushed a commit that referenced this pull request Aug 25, 2014
RDD.zipWithIndex()

        Zips this RDD with its element indices.

        The ordering is first based on the partition index and then the
        ordering of items within each partition. So the first item in
        the first partition gets index 0, and the last item in the last
        partition receives the largest index.

        This method needs to trigger a spark job when this RDD contains
        more than one partitions.

        >>> sc.parallelize(range(4), 2).zipWithIndex().collect()
        [(0, 0), (1, 1), (2, 2), (3, 3)]

RDD.zipWithUniqueId()

        Zips this RDD with generated unique Long ids.

        Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
        n is the number of partitions. So there may exist gaps, but this
        method won't trigger a spark job, which is different from
        L{zipWithIndex}

        >>> sc.parallelize(range(4), 2).zipWithUniqueId().collect()
        [(0, 0), (2, 1), (1, 2), (3, 3)]

Author: Davies Liu <davies.liu@gmail.com>

Closes #2092 from davies/zipWith and squashes the following commits:

cebe5bf [Davies Liu] improve test cases, reverse the order of index
0d2a128 [Davies Liu] add zipWithIndex() and zipWithUniqueId()

(cherry picked from commit fb0db77)
Signed-off-by: Josh Rosen <joshrosen@apache.org>
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
RDD.zipWithIndex()

        Zips this RDD with its element indices.

        The ordering is first based on the partition index and then the
        ordering of items within each partition. So the first item in
        the first partition gets index 0, and the last item in the last
        partition receives the largest index.

        This method needs to trigger a spark job when this RDD contains
        more than one partitions.

        >>> sc.parallelize(range(4), 2).zipWithIndex().collect()
        [(0, 0), (1, 1), (2, 2), (3, 3)]

RDD.zipWithUniqueId()

        Zips this RDD with generated unique Long ids.

        Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
        n is the number of partitions. So there may exist gaps, but this
        method won't trigger a spark job, which is different from
        L{zipWithIndex}

        >>> sc.parallelize(range(4), 2).zipWithUniqueId().collect()
        [(0, 0), (2, 1), (1, 2), (3, 3)]

Author: Davies Liu <davies.liu@gmail.com>

Closes apache#2092 from davies/zipWith and squashes the following commits:

cebe5bf [Davies Liu] improve test cases, reverse the order of index
0d2a128 [Davies Liu] add zipWithIndex() and zipWithUniqueId()
@davies davies deleted the zipWith branch September 15, 2014 22:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants