Skip to content

[GRAPHX] Spark 3789 - Python Bindings for GraphX #4205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
fcbeee2
SPARK-3789: initial commit
kdatta Oct 6, 2014
0eefa44
SPARK-3789: added graph, vertex and edge python files
kdatta Oct 6, 2014
c99f81c
SPARK-3789: Added PythonGraphLoader
kdatta Oct 7, 2014
207d8ba
SPARK-3789: Removed PythonGraphLoader. Added java_import statement to…
kdatta Oct 13, 2014
2c2cef7
SPARK-3789: WIP - Added JavaVertexRDD, JavaEdgeRDD and the first few …
kdatta Oct 30, 2014
a69a589
SPARK-3789: Merging master on 10/30/2014
kdatta Oct 30, 2014
cf1df50
SPARK-3789: Removed .pyc files
kdatta Oct 30, 2014
1580513
SPARK-3789: WIP - PythonVertexRDD works
kdatta Nov 4, 2014
08140bf
SPARK-3789: WIP - 11/6/2014
kdatta Nov 6, 2014
13b96d9
Merge branch 'master' into SPARK-3789
kdatta Nov 6, 2014
a2faa64
SPARK-3789: WIP
kdatta Nov 12, 2014
28be23e
SPARK-3789: WIP
kdatta Nov 19, 2014
a23d418
SPARK-3789: WIP
kdatta Nov 19, 2014
19b280d
SPARK-3789: Updated vertex.py, edge.py and graph.py
kdatta Nov 19, 2014
d07ae43
SPARK-3789: JavaEdgeRDDLike compiler errors.
kdatta Dec 10, 2014
e02a8ee
SPARK-3789: JavaEdgeRDDLike compiler errors.
kdatta Dec 16, 2014
dd9c278
SPARK-3789: Merging master on 12/16
kdatta Dec 16, 2014
49e3845
SPARK-3789: Updated JavaEdgeRDD class according to EdgeRDD and EdgeRD…
kdatta Dec 18, 2014
44c051f
SPARK-3789: Mergin master on 12/18
kdatta Dec 18, 2014
db8cff0
SPARK-3789: temp commit before merging master on 12/22
kdatta Dec 22, 2014
46dcd9a
SPARK-3789: Merging master on 12/22
kdatta Dec 22, 2014
3754117
SPARK-3789: temp commit before merging master on 1/5
kdatta Jan 5, 2015
a05d458
SPARK-3789: Merge master on 1/5
kdatta Jan 5, 2015
5717578
SPARK-3789: temp commit before merging master on 1/7
kdatta Jan 7, 2015
1bbfffa
SPARK-3789: temp commit before merging master on 1/16
kdatta Jan 16, 2015
7297f0e
SPARK-3789: Merging master on 1/16
kdatta Jan 16, 2015
36d15df
SPARK-3789: collect(), take() fixed
kdatta Jan 16, 2015
08d4209
SPARK-3789: temp commit before merging master on 1/18
kdatta Jan 19, 2015
577cb4a
SPARK-3789: Merge master on 1/18
kdatta Jan 19, 2015
6a6b7ec
SPARK-3789: filter in VertexRDD fixed
kdatta Jan 21, 2015
6b30605
SPARK-3789: Merged master on 1/21
kdatta Jan 21, 2015
b9e9877
SPARK-3789: Following methods are complete in VertexRDD(vertex.py), P…
kdatta Jan 22, 2015
9e8f7db
SPARK-3789: innerJoin fixed in vertexrdd
kdatta Jan 23, 2015
51e7290
SPARK-3789: EdgeRDD bugs fixed
kdatta Jan 26, 2015
d0a7479
SPARK-3789: Merging master on 1/26
kdatta Jan 26, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ export CLASSPATH
# the driver JVM itself. Instead of handling this complexity in Bash, we launch a separate JVM
# to prepare the launch environment of this driver JVM.

export JAVA_OPTS+=" -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this was just a debugging change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, deleted.

echo $JAVA_OPTS

if [ -n "$SPARK_SUBMIT_BOOTSTRAP_DRIVER" ]; then
# This is used only if the properties file actually contains these special configs
# Export the environment variables needed by SparkSubmitDriverBootstrapper
Expand Down
23 changes: 11 additions & 12 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,30 @@ package org.apache.spark.api.python

import java.io._
import java.net._
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, UUID, Collections}

import org.apache.spark.input.PortableDataStream

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.language.existentials
import java.util.{Collections, ArrayList => JArrayList, List => JList, Map => JMap}

import com.google.common.base.Charsets.UTF_8

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf}
import org.apache.hadoop.mapred.{InputFormat, JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, OutputFormat => NewOutputFormat}
import org.apache.spark._
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.language.existentials

private[spark] class PythonRDD(
@transient parent: RDD[_],
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
preservePartitoning: Boolean,
preservePartitioning: Boolean,
pythonExec: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: Accumulator[JList[Array[Byte]]])
Expand All @@ -55,9 +53,10 @@ private[spark] class PythonRDD(

override def getPartitions = firstParent.partitions

override val partitioner = if (preservePartitoning) firstParent.partitioner else None
override val partitioner = if (preservePartitioning) firstParent.partitioner else None

override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {

val startTime = System.currentTimeMillis
val env = SparkEnv.get
val localdir = env.blockManager.diskBlockManager.localDirs.map(
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, BernoulliCellSampler,
SamplingUtils}
SamplingUtils}

/**
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
Expand Down Expand Up @@ -1378,7 +1378,7 @@ abstract class RDD[T: ClassTag](
def toDebugString: String = {
// Get a debug description of an rdd without its children
def debugSelf (rdd: RDD[_]): Seq[String] = {
import Utils.bytesToString
import org.apache.spark.util.Utils.bytesToString

val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else ""
val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id == rdd.id).map(info =>
Expand Down
48 changes: 48 additions & 0 deletions examples/src/main/python/graphx/simpleGraph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Correlations using MLlib.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like the wrong description.

"""

import sys

from pyspark import SparkContext
from pyspark.graphx import GraphLoader
from pyspark.graphx import Vertex
from pyspark.graphx import Edge

if __name__ == "__main__":

"""
Usage: simpleGraph filename [partitions]"
"""

sc = SparkContext(appName="PythonSimpleGraphExample")
graphFile = int(sys.argv[1]) if len(sys.argv) > 1 else "simplegraph.edges"
partitions = int(sys.argv[2]) if len(sys.argv) > 2 else 2

print "Running SimpleGraph example with filename=%s partitions=%d\n" % (graphFile, partitions)

graph = GraphLoader.edgeListFile(sc, graphFile, partitions)
vertices = graph.vertices()
edges = graph.edges





6 changes: 6 additions & 0 deletions graphx/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this version should be specified here; it should be inherited from the parent POM or set via a configuration or property.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object PartitionStrategy {
}

/**
* Assigns edges to partitions using only the source vertex ID, colocating edges with the same
* Assigns edges to partitions using only the source vertex ID, collocating edges with the same
* source.
*/
case object EdgePartition1D extends PartitionStrategy {
Expand All @@ -98,7 +98,7 @@ object PartitionStrategy {

/**
* Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a
* random vertex cut that colocates all same-direction edges between two vertices.
* random vertex cut that collocates all same-direction edges between two vertices.
*/
case object RandomVertexCut extends PartitionStrategy {
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.graphx.api.java

import java.lang.{Long => JLong}
import java.util.{List => JList}

import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

import scala.language.implicitConversions
import scala.reflect.ClassTag

/**
* EdgeRDD['ED'] is a column-oriented edge partition RDD created from RDD[Edge[ED]].
* JavaEdgeRDD class provides a Java API to access implementations of the EdgeRDD class
*
* @param targetStorageLevel
* @tparam ED
*/
class JavaEdgeRDD[ED](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this class be public?

val edges: RDD[Edge[ED]],
val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
(implicit val classTag: ClassTag[ED])
extends JavaEdgeRDDLike[ED, JavaEdgeRDD[ED], JavaRDD[(VertexId, VertexId, ED)]] {

// /**
// * To create JavaEdgeRDD from JavaRDDs of tuples
// * (source vertex id, destination vertex id and edge property class).
// * The edge property class can be Array[Byte]
// * @param jEdges
// */
// def this(jEdges: JavaRDD[(VertexId, VertexId, ED)]) = {
// this(jEdges.rdd.map(x => Edge[ED](x._1, x._2, x._3)))
// }

/* Convert RDD[(PartitionID, EdgePartition[ED, VD])] to EdgeRDD[ED, VD] */
override def edgeRDD = EdgeRDD.fromEdges(edges)

/**
* Java Wrapper for RDD of Edges
*
* @param edgeRDD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're not going to document parameters, just omit them.

* @return
*/
def wrapRDD(edgeRDD: RDD[Edge[ED]]): JavaRDD[Edge[ED]] = {
JavaRDD.fromRDD(edgeRDD)
}

/** Persist RDDs of this JavaEdgeRDD with the default storage level (MEMORY_ONLY_SER) */
def cache(): this.type = {
edges.cache()
this
}

def collect(): JList[Edge[ED]] = {
import scala.collection.JavaConversions._
val arr: java.util.Collection[Edge[ED]] = edges.collect().toSeq
new java.util.ArrayList(arr)
}

/**
* Return a new single long element generated by counting all elements in the vertex RDD
*/
override def count(): JLong = edges.count()

/** Return a new VertexRDD containing only the elements that satisfy a predicate. */
def filter(f: JFunction[Edge[ED], Boolean]): JavaEdgeRDD[ED] =
JavaEdgeRDD(edgeRDD.filter(x => f.call(x).booleanValue()))

def id: JLong = edges.id.toLong

/** Persist RDDs of this JavaEdgeRDD with the default storage level (MEMORY_ONLY_SER) */
def persist(): this.type = {
edges.persist()
this
}

/** Persist the RDDs of this EdgeRDD with the given storage level */
def persist(storageLevel: StorageLevel): this.type = {
edges.persist(storageLevel)
this
}

def unpersist(blocking: Boolean = true) : this.type = {
edgeRDD.unpersist(blocking)
this
}

override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): JavaEdgeRDD[ED2] = {
JavaEdgeRDD(edgeRDD.mapValues(f))
}

override def reverse: JavaEdgeRDD[ED] = JavaEdgeRDD(edgeRDD.reverse)

def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2])
(f: (VertexId, VertexId, ED, ED2) => ED3): JavaEdgeRDD[ED3] = {
JavaEdgeRDD(edgeRDD.innerJoin(other)(f))
}

def toRDD : RDD[Edge[ED]] = edges
}

object JavaEdgeRDD {

implicit def apply[ED: ClassTag](edges: JavaRDD[Edge[ED]]) : JavaEdgeRDD[ED] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: no space before the :.

JavaEdgeRDD(EdgeRDD.fromEdges(edges.rdd))
}

def toEdgeRDD[ED: ClassTag](edges: JavaEdgeRDD[ED]): RDD[Edge[ED]] = {
JavaEdgeRDD(edges.edgeRDD).toRDD
}

// def apply[ED: ClassTag](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind removing all of the commented-out code?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

// jEdges: JavaRDD[(VertexId, VertexId, ED)]): JavaEdgeRDD[ED] = {
// val edges : RDD[Edge[ED]] = jEdges.rdd.map(x => Edge(x._1, x._2, x._3))
// new JavaEdgeRDD(edges)
// }
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.graphx.api.java

import java.lang.{Long => JLong}
import java.util.{List => JList}

import org.apache.spark.api.java.JavaRDDLike
import org.apache.spark.graphx._
import org.apache.spark.{Partition, TaskContext}

import scala.reflect.ClassTag

trait JavaEdgeRDDLike [ED, This <: JavaEdgeRDDLike[ED, This, R],
R <: JavaRDDLike[(VertexId, VertexId, ED), R]]
extends Serializable {

def edgeRDD: EdgeRDD[ED]

def setName() = edgeRDD.setName("JavaEdgeRDD")

def count() : JLong = edgeRDD.count()

def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
edgeRDD.compute(part, context)
}

def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): JavaEdgeRDD[ED2]

def reverse: JavaEdgeRDD[ED]
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scalastyle is going to complain for this file (and a few others), since this is missing a blank line at the end of the file. Mind running sbt/sbt scalastyle and fixing the errors?

Loading