Skip to content

[SPARK 5280] RDF Loader added + documentation #4650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 40 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
10436d2
fast forward from upstream
Feb 3, 2015
595aed0
dictionary builder done
Feb 3, 2015
c239902
[SPARK 5280]
Feb 3, 2015
f14e483
done dictionary version
Feb 3, 2015
43cc53a
[SPARK 5280] rdfloader using hashes as VertexIds
Feb 3, 2015
2e1220d
cleaned up + fixed style
Feb 4, 2015
54e2c6e
made custom 64bit hash
Feb 4, 2015
b454560
proper
Feb 4, 2015
45a9f57
fast forward from upstream
Feb 3, 2015
6ee9a2b
dictionary builder done
Feb 3, 2015
45c2216
[SPARK 5280]
Feb 3, 2015
fa5c0da
done dictionary version
Feb 3, 2015
c036f98
[SPARK 5280] rdfloader using hashes as VertexIds
Feb 3, 2015
5755379
cleaned up + fixed style
Feb 4, 2015
e00123e
made custom 64bit hash
Feb 4, 2015
6af9a7a
proper
Feb 4, 2015
1ee34c9
Merge branch 'master' of github.com:lukovnikov/spark into rdfloaderhash
Feb 4, 2015
9000a47
Merge branch 'rdfloaderhash' of github.com:lukovnikov/spark into rdfl…
Feb 4, 2015
70eb725
RDF Loader with hash, tested on small RDF dumps (more tests in progress)
Feb 4, 2015
4398d93
added documentation for RDFLoader
lukovnikov Feb 4, 2015
273a1b3
small update to RDFLoader description
lukovnikov Feb 4, 2015
202ccf8
sdf
Feb 4, 2015
2d990ce
fast forward from upstream
Feb 3, 2015
4a9b622
Merge branch 'master' of github.com:lukovnikov/spark
Feb 4, 2015
062996c
Merge branch 'rdfloaderhash'
Feb 4, 2015
121bf14
[SPARK 5280]
Feb 4, 2015
67ada51
Merge branch 'rdfloaderhash' of github.com:lukovnikov/spark into rdfl…
Feb 4, 2015
e5fcf75
Merge branch 'rdfloaderhash'
Feb 4, 2015
c5960af
Merge remote-tracking branch 'upstream/master'
Feb 4, 2015
91361f3
undone unnecessary changes
Feb 5, 2015
686af40
style fixed
Feb 5, 2015
cbc720f
tested
Feb 11, 2015
80d9b72
added dictionary getter
Feb 11, 2015
4014c7f
style errors fixed
Feb 18, 2015
04df47a
Merge remote-tracking branch 'upstream/master'
Feb 21, 2015
0421112
added test
Feb 21, 2015
1bec795
test style better
Feb 21, 2015
b658c55
revert pom
Feb 21, 2015
3db73ab
source file closed
Feb 21, 2015
4daa6e9
changed resource to .data
Feb 21, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/graphx-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ description: GraphX graph processing library guide for Spark SPARK_VERSION_SHORT
[GraphOps.pregel]: api/scala/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexId,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]
[PartitionStrategy]: api/scala/index.html#org.apache.spark.graphx.PartitionStrategy$
[GraphLoader.edgeListFile]: api/scala/index.html#org.apache.spark.graphx.GraphLoader$@edgeListFile(SparkContext,String,Boolean,Int):Graph[Int,Int]
[RDFLoader.loadNTriples]: api/scala/index.html#org.apache.spark.graphx.loaders.RDFLoader@loadNTriples(SparkContext,String,Int,Int):Graph[String,String]
[Graph.apply]: api/scala/index.html#org.apache.spark.graphx.Graph$@apply[VD,ED](RDD[(VertexId,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
[Graph.fromEdgeTuples]: api/scala/index.html#org.apache.spark.graphx.Graph$@fromEdgeTuples[VD](RDD[(VertexId,VertexId)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int]
[Graph.fromEdges]: api/scala/index.html#org.apache.spark.graphx.Graph$@fromEdges[VD,ED](RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
Expand Down Expand Up @@ -875,6 +876,11 @@ object Graph {

<a name="vertex_and_edge_rdds"></a>

## RDF Graph Builder
[`RDFLoader.loadNTriples`][RDFLoader.loadNTriples] loads an RDF graph from a .nt dump into a `Graph[String,String]`.
Both resource nodes and literal nodes are mapped to vertices, with the URI or the literal value (not processed in any way) as the String value. Each `Vertex[String]` is assigned an Id using a simple hash function applied to the URI of the resource node or to the value of the literal node prefixed by the subject and property URI's of the triple where that literal occurs. Thus, literal nodes identical in value but from non-identical triples are each assigned a separate `Vertex[String]` in the `Graph[String, String]`.
Each valid triple is mapped to an `Edge[String]`, carrying the RDF property URI as its `String` value.

# Vertex and Edge RDDs

GraphX exposes `RDD` views of the vertices and edges stored within the graph. However, because
Expand Down
122 changes: 122 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/loaders/RDFLoader.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.graphx.loaders


import org.apache.spark.storage.StorageLevel
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
import org.apache.spark.AccumulatorParam
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.Edge

/**
* Provides utilities for loading RDF [[Graph]]s from .NT dumps.
*/
object RDFLoader extends Logging {

val relregex = "^<([^>]+)>\\s<([^>]+)>\\s<([^>]+)>\\s\\.$".r
val propregex = "^<([^>]+)>\\s<([^>]+)>\\s(.+)\\s\\.$".r
val propvalregex = "^<([^>]+)>\\s(.+)$".r

/**
* Transforms an RDF dump in ntriples (.nt) format into a graph.
* Uses a simple hash function to generate VertexIds.
* All subject and object values of triples are represented as a vertex with
* the hash as VertexId and the URI (for resources) or the value (for literals)
* as the associated String value for that vertex.
* The edges map between vertices and carry the label of the relation URI.
* Literal values that are identical in value but occur in different triples
* are mapped to different nodes.
*
* @param sc SparkContext
* @param path the path to the file (e.g., /home/data/file or hdfs://file)
*
* @param edgeStorageLevel the desired storage level for the edge partitions
* @param vertexStorageLevel the desired storage level for the vertex partitions
*/
def loadNTriples(
sc: SparkContext,
path: String,
// numPartitions: Int = -1,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
)
: Graph[String, String] =
{
val startTime = System.currentTimeMillis()

val lines = sc.textFile(path)

val vertices = lines
.flatMap(line => {
line match {
case relregex(subj, rel, obj) => Set(subj, obj)
case propregex(subj, rel, value) => Set(subj, "<" + subj + "-" + rel + "> " + value)
case _ => Set[String]()
}
})
.distinct()
// .repartition(numPartitions)
.map(name =>
name match {
case propvalregex(pre, value) => (gethash("<" + pre + "> " + value), value)
case _ => (gethash(name), name)
}
)
.persist(vertexStorageLevel) // TODO: set name etc

val edges: RDD[Edge[String]] = lines
.map( line => {
line match {
case relregex(subj, rel, obj) => Edge(gethash(subj), gethash(obj), rel)
case propregex(subj, rel, obj)
=> Edge(gethash(subj), gethash("<" + subj + "-" + rel + "> " + obj), rel)
case _ => Edge(0,0, "null")
}
})
.persist(edgeStorageLevel) // TODO: set name

val graph = Graph(vertices, edges)
return graph // so far
} // end of edgeListFile


/**
* Gets dictionary for entities (not properties) from the provided graph.
*/
def getdictionary(graph:Graph[String,String]): RDD[(Long,String)] = {
val vertices = graph.vertices.map(x => (x._1.toLong, x._2))
return vertices
}


/**
* Implements a simple hashing function for Strings
*/
def gethash(in:String):Long = {
var h = 1125899906842597L
for (x <- in) {
h = 31 * h + x;
}
return h
}

}

17 changes: 17 additions & 0 deletions graphx/src/test/resources/graph_nt.data
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<http://dbpedia.org/resource/Autism> <http://dbpedia.org/ontology/diseasesdb> "1142"@en .
<http://dbpedia.org/resource/Autism> <http://dbpedia.org/ontology/icd9> "299.00"@en .
<http://dbpedia.org/resource/Autism> <http://dbpedia.org/ontology/omim> "209850"^^<http://www.w3.org/2001/XMLSchema#integer> .
<http://dbpedia.org/resource/Autism> <http://dbpedia.org/ontology/medlineplus> "001526"@en .
<http://dbpedia.org/resource/Autism> <http://dbpedia.org/ontology/emedicineSubject> "med"@en .
<http://dbpedia.org/resource/Autism> <http://dbpedia.org/ontology/emedicineTopic> "3202"@en .
<http://dbpedia.org/resource/Autism> <http://dbpedia.org/ontology/meshId> "D001321"@en .
<http://dbpedia.org/resource/Autism> <http://xmlns.com/foaf/0.1/name> "Autism"@en .
<http://dbpedia.org/resource/Aristotle> <http://xmlns.com/foaf/0.1/name> "Aristotle"@en .
<http://dbpedia.org/resource/Aristotle> <http://dbpedia.org/ontology/birthYear> "-0384"^^<http://www.w3.org/2001/XMLSchema#gYear> .
<http://dbpedia.org/resource/Aristotle> <http://dbpedia.org/ontology/deathYear> "-0322"^^<http://www.w3.org/2001/XMLSchema#gYear> .
<http://dbpedia.org/resource/Aristotle> <http://dbpedia.org/ontology/region> <http://dbpedia.org/resource/Western_philosophy> .
<http://dbpedia.org/resource/Aristotle> <http://dbpedia.org/ontology/era> <http://dbpedia.org/resource/Ancient_philosophy> .
<http://dbpedia.org/resource/Aristotle> <http://dbpedia.org/ontology/philosophicalSchool> <http://dbpedia.org/resource/Peripatetic_school> .
<http://dbpedia.org/resource/Aristotle> <http://dbpedia.org/ontology/philosophicalSchool> <http://dbpedia.org/resource/Aristotelianism> .
<http://dbpedia.org/resource/Aristotle> <http://dbpedia.org/ontology/mainInterest> <http://dbpedia.org/resource/Physics> .
<http://dbpedia.org/resource/Aristotle> <http://dbpedia.org/ontology/mainInterest> <http://dbpedia.org/resource/Metaphysics> .
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.graphx.loaders

import org.scalatest.FunSuite
import org.apache.spark.graphx.LocalSparkContext
import scala.io.Source

class RDFLoaderSuite extends FunSuite with LocalSparkContext {
test("RDFLoader.loadNTriples") {
withSpark { sc =>
val file = getClass.getResource("/graph_nt.data").getFile
val graph = RDFLoader.loadNTriples(sc, file)
val edges = graph.edges.collect()
assert(edges.length == 17)
val vertices = graph.vertices.collect()
assert(vertices.length == 19)
val triples = graph.mapTriplets(triplet => {
triplet.srcAttr + "<" + triplet.attr + ">" + triplet.dstAttr
}).edges.collect
val propregex = "<([^>]+)>\\s<([^>]+)>\\s(.+)\\s\\.".r
val relregex = "<([^>]+)>\\s<([^>]+)>\\s<([^>]+)>\\s\\.".r

// for (triple <- triples) Console.println(triple.attr)

def assertline(a: String, b: String, c: String) = {
// find corresponding line from triples from graph
var found = false;
for (triple <- triples) {
if (triple.attr == a + "<" + b + ">" + c) {
found = true
}
}
if (!found) Console.println(a, b, c)
assert(found)
}
val original = Source.fromFile(file)
for (line <- original.getLines) {
line match {
case relregex(a, b, c) => assertline(a, b, c)
case propregex(a, b, c) => assertline(a, b, c)
case _ =>
}
}
original.close
}
}
}