-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[GRAPHX] Spark 3789 - Python Bindings for GraphX #4205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fcbeee2
0eefa44
c99f81c
207d8ba
2c2cef7
a69a589
cf1df50
1580513
08140bf
13b96d9
a2faa64
28be23e
a23d418
19b280d
d07ae43
e02a8ee
dd9c278
49e3845
44c051f
db8cff0
46dcd9a
3754117
a05d458
5717578
1bbfffa
7297f0e
36d15df
08d4209
577cb4a
6a6b7ec
6b30605
b9e9877
9e8f7db
51e7290
d0a7479
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
""" | ||
Correlations using MLlib. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like the wrong description. |
||
""" | ||
|
||
import sys | ||
|
||
from pyspark import SparkContext | ||
from pyspark.graphx import GraphLoader | ||
from pyspark.graphx import Vertex | ||
from pyspark.graphx import Edge | ||
|
||
if __name__ == "__main__": | ||
|
||
""" | ||
Usage: simpleGraph filename [partitions]" | ||
""" | ||
|
||
sc = SparkContext(appName="PythonSimpleGraphExample") | ||
graphFile = int(sys.argv[1]) if len(sys.argv) > 1 else "simplegraph.edges" | ||
partitions = int(sys.argv[2]) if len(sys.argv) > 2 else 2 | ||
|
||
print "Running SimpleGraph example with filename=%s partitions=%d\n" % (graphFile, partitions) | ||
|
||
graph = GraphLoader.edgeListFile(sc, graphFile, partitions) | ||
vertices = graph.vertices() | ||
edges = graph.edges | ||
|
||
|
||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,12 @@ | |
<artifactId>scalacheck_${scala.binary.version}</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>junit</groupId> | ||
<artifactId>junit</artifactId> | ||
<version>4.11</version> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this version should be specified here; it should be inherited from the parent POM or set via a configuration or property. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed |
||
<scope>test</scope> | ||
</dependency> | ||
</dependencies> | ||
<build> | ||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.graphx.api.java | ||
|
||
import java.lang.{Long => JLong} | ||
import java.util.{List => JList} | ||
|
||
import org.apache.spark.api.java.JavaRDD | ||
import org.apache.spark.api.java.function.{Function => JFunction} | ||
import org.apache.spark.graphx._ | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.storage.StorageLevel | ||
|
||
import scala.language.implicitConversions | ||
import scala.reflect.ClassTag | ||
|
||
/** | ||
* EdgeRDD['ED'] is a column-oriented edge partition RDD created from RDD[Edge[ED]]. | ||
* JavaEdgeRDD class provides a Java API to access implementations of the EdgeRDD class | ||
* | ||
* @param targetStorageLevel | ||
* @tparam ED | ||
*/ | ||
class JavaEdgeRDD[ED]( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this class be public? |
||
val edges: RDD[Edge[ED]], | ||
val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) | ||
(implicit val classTag: ClassTag[ED]) | ||
extends JavaEdgeRDDLike[ED, JavaEdgeRDD[ED], JavaRDD[(VertexId, VertexId, ED)]] { | ||
|
||
// /** | ||
// * To create JavaEdgeRDD from JavaRDDs of tuples | ||
// * (source vertex id, destination vertex id and edge property class). | ||
// * The edge property class can be Array[Byte] | ||
// * @param jEdges | ||
// */ | ||
// def this(jEdges: JavaRDD[(VertexId, VertexId, ED)]) = { | ||
// this(jEdges.rdd.map(x => Edge[ED](x._1, x._2, x._3))) | ||
// } | ||
|
||
/* Convert RDD[(PartitionID, EdgePartition[ED, VD])] to EdgeRDD[ED, VD] */ | ||
override def edgeRDD = EdgeRDD.fromEdges(edges) | ||
|
||
/** | ||
* Java Wrapper for RDD of Edges | ||
* | ||
* @param edgeRDD | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you're not going to document parameters, just omit them. |
||
* @return | ||
*/ | ||
def wrapRDD(edgeRDD: RDD[Edge[ED]]): JavaRDD[Edge[ED]] = { | ||
JavaRDD.fromRDD(edgeRDD) | ||
} | ||
|
||
/** Persist RDDs of this JavaEdgeRDD with the default storage level (MEMORY_ONLY_SER) */ | ||
def cache(): this.type = { | ||
edges.cache() | ||
this | ||
} | ||
|
||
def collect(): JList[Edge[ED]] = { | ||
import scala.collection.JavaConversions._ | ||
val arr: java.util.Collection[Edge[ED]] = edges.collect().toSeq | ||
new java.util.ArrayList(arr) | ||
} | ||
|
||
/** | ||
* Return a new single long element generated by counting all elements in the vertex RDD | ||
*/ | ||
override def count(): JLong = edges.count() | ||
|
||
/** Return a new VertexRDD containing only the elements that satisfy a predicate. */ | ||
def filter(f: JFunction[Edge[ED], Boolean]): JavaEdgeRDD[ED] = | ||
JavaEdgeRDD(edgeRDD.filter(x => f.call(x).booleanValue())) | ||
|
||
def id: JLong = edges.id.toLong | ||
|
||
/** Persist RDDs of this JavaEdgeRDD with the default storage level (MEMORY_ONLY_SER) */ | ||
def persist(): this.type = { | ||
edges.persist() | ||
this | ||
} | ||
|
||
/** Persist the RDDs of this EdgeRDD with the given storage level */ | ||
def persist(storageLevel: StorageLevel): this.type = { | ||
edges.persist(storageLevel) | ||
this | ||
} | ||
|
||
def unpersist(blocking: Boolean = true) : this.type = { | ||
edgeRDD.unpersist(blocking) | ||
this | ||
} | ||
|
||
override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): JavaEdgeRDD[ED2] = { | ||
JavaEdgeRDD(edgeRDD.mapValues(f)) | ||
} | ||
|
||
override def reverse: JavaEdgeRDD[ED] = JavaEdgeRDD(edgeRDD.reverse) | ||
|
||
def innerJoin[ED2: ClassTag, ED3: ClassTag] | ||
(other: EdgeRDD[ED2]) | ||
(f: (VertexId, VertexId, ED, ED2) => ED3): JavaEdgeRDD[ED3] = { | ||
JavaEdgeRDD(edgeRDD.innerJoin(other)(f)) | ||
} | ||
|
||
def toRDD : RDD[Edge[ED]] = edges | ||
} | ||
|
||
object JavaEdgeRDD { | ||
|
||
implicit def apply[ED: ClassTag](edges: JavaRDD[Edge[ED]]) : JavaEdgeRDD[ED] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Style nit: no space before the |
||
JavaEdgeRDD(EdgeRDD.fromEdges(edges.rdd)) | ||
} | ||
|
||
def toEdgeRDD[ED: ClassTag](edges: JavaEdgeRDD[ED]): RDD[Edge[ED]] = { | ||
JavaEdgeRDD(edges.edgeRDD).toRDD | ||
} | ||
|
||
// def apply[ED: ClassTag]( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mind removing all of the commented-out code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed. |
||
// jEdges: JavaRDD[(VertexId, VertexId, ED)]): JavaEdgeRDD[ED] = { | ||
// val edges : RDD[Edge[ED]] = jEdges.rdd.map(x => Edge(x._1, x._2, x._3)) | ||
// new JavaEdgeRDD(edges) | ||
// } | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.graphx.api.java | ||
|
||
import java.lang.{Long => JLong} | ||
import java.util.{List => JList} | ||
|
||
import org.apache.spark.api.java.JavaRDDLike | ||
import org.apache.spark.graphx._ | ||
import org.apache.spark.{Partition, TaskContext} | ||
|
||
import scala.reflect.ClassTag | ||
|
||
trait JavaEdgeRDDLike [ED, This <: JavaEdgeRDDLike[ED, This, R], | ||
R <: JavaRDDLike[(VertexId, VertexId, ED), R]] | ||
extends Serializable { | ||
|
||
def edgeRDD: EdgeRDD[ED] | ||
|
||
def setName() = edgeRDD.setName("JavaEdgeRDD") | ||
|
||
def count() : JLong = edgeRDD.count() | ||
|
||
def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { | ||
edgeRDD.compute(part, context) | ||
} | ||
|
||
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): JavaEdgeRDD[ED2] | ||
|
||
def reverse: JavaEdgeRDD[ED] | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Scalastyle is going to complain for this file (and a few others), since this is missing a blank line at the end of the file. Mind running |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this was just a debugging change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, deleted.