Skip to content

Commit faa4073

Browse files
committed
GraphX benchmarking changes for VLDB
1. In-memory shuffle 2. Checking Iterator.hasNext 3. In-place/destructive updates in vertex replication 4. WikiPipelineBenchmark 5. K-Core 6. Dataflow (naive Spark) PageRank Squashed commit of the following: commit ad6590c Author: Ankur Dave <ankurdave@gmail.com> Date: Mon Apr 21 17:56:24 2014 -0700 Add unpersist option to Pregel commit 3bcaa2f Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Wed Apr 30 05:58:11 2014 +0000 Added faster version of kcore and fixed compile issue with dataflow pagerank commit 03f5d76 Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Wed Apr 30 02:22:13 2014 +0000 Added more connected components. commit cfd209b Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Wed Apr 30 00:01:24 2014 +0000 Initial CC dataflow implementation. commit 255db45 Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Tue Apr 29 19:38:26 2014 +0000 Updated pregel logging. commit 5032924 Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Tue Apr 29 08:52:07 2014 +0000 Added dataflow pagerank. Having issues with it and in-memory shuffle. commit f483ca4 Merge: 8d22500 9e59642 Author: Ankur Dave <ankurdave@gmail.com> Date: Tue Apr 29 00:05:49 2014 -0700 Merge pull request apache#3 from dcrankshaw/osdi_with_kcore_for_merge Osdi with kcore for merge commit 9e59642 Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Mon Apr 28 20:51:24 2014 +0000 Added more logging. Conflicts: graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala commit 76a6a54 Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Sun Apr 13 03:00:43 2014 +0000 Fixed kcore. now works. commit 1e924df Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Sun Apr 13 00:06:40 2014 +0000 Changed kcore filenames. commit a641dc1 Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Sat Apr 12 01:59:17 2014 +0000 Kcore debugging. commit dbe5180 Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Fri Apr 11 22:49:38 2014 +0000 Fixed results of cherry-pick commit 4ddc552 Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Tue Jan 7 13:44:46 2014 -0800 Added kcore implementation. Conflicts: graph/src/main/scala/org/apache/spark/graph/Pregel.scala commit 250199a Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Mon Apr 28 22:23:49 2014 +0000 Minor logging changes. commit 8d22500 Author: Ankur Dave <ankurdave@gmail.com> Date: Fri Apr 25 16:34:53 2014 -0700 EdgePartition.size should be val commit bb36cc8 Author: Ankur Dave <ankurdave@gmail.com> Date: Fri Apr 25 03:08:27 2014 -0700 Set locality wait commit ad4c874 Author: Ankur Dave <ankurdave@gmail.com> Date: Sat Apr 19 20:08:57 2014 -0700 In GraphLoader, coalesce to minEdgePartitions commit 2898126 Author: Ankur Dave <ankurdave@gmail.com> Date: Sat Apr 19 20:10:36 2014 -0700 In Analytics, take PageRank numIter commit 719b04a Author: Ankur Dave <ankurdave@gmail.com> Date: Sat Apr 19 20:09:24 2014 -0700 Log current Pregel iteration commit 8609184 Author: Ankur Dave <ankurdave@gmail.com> Date: Fri Mar 28 15:15:44 2014 -0700 Clean up WikiPipelineBenchmark commit e8be08e Author: Ankur Dave <ankurdave@gmail.com> Date: Fri Mar 28 15:14:50 2014 -0700 for-loop to while-loop in WikiArticle commit 540c267 Author: Ankur Dave <ankurdave@gmail.com> Date: Fri Mar 28 15:13:56 2014 -0700 Make mutable RDDs optional commit e27da44 Author: Ankur Dave <ankurdave@gmail.com> Date: Fri Mar 28 15:12:58 2014 -0700 Check for empty iterators commit 5ec645d Author: Ankur Dave <ankurdave@gmail.com> Date: Fri Mar 28 15:10:41 2014 -0700 In memory shuffle (cherry-picked from amplab/graphx#135) commit f36e576 Author: Ankur Dave <ankurdave@gmail.com> Date: Fri Mar 28 15:09:23 2014 -0700 Log warning on partition recompute commit c289e91 Author: Ankur Dave <ankurdave@gmail.com> Date: Fri Mar 28 15:08:06 2014 -0700 Untrack conf files commit a61be7a Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Tue Mar 25 07:03:41 2014 +0000 More timing logging and fixing the iterator issue. commit 0645183 Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Mon Mar 24 21:34:00 2014 +0000 Fixed IO bugs. commit a13f3b9 Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Mon Mar 24 19:52:46 2014 +0000 Fixed compile errors commit e6fc93b Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Mon Mar 24 03:29:19 2014 +0000 Updated pipeline benchmark to handle other systems. commit 0a04cb9 Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Sun Mar 23 20:57:08 2014 +0000 Renamed prepostprocess commit a262b07 Merge: 470a950 3342751 Author: Ankur Dave <ankurdave@gmail.com> Date: Fri Mar 21 11:39:02 2014 -0700 Merge remote-tracking branch 'dcrankshaw-incubator-spark/gx-vldb-bench' into vldb Conflicts: project/SparkBuild.scala commit 470a950 Merge: e09139d 3407a8f Author: Ankur Dave <ankurdave@gmail.com> Date: Fri Mar 21 11:30:12 2014 -0700 Merge branch 'mutable-rdd' into vldb commit 3342751 Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Sun Mar 2 02:14:46 2014 +0000 Fixed partitioning issues. commit 3407a8f Author: Ankur Dave <ankurdave@gmail.com> Date: Sat Mar 1 16:39:09 2014 -0800 Try mutating old RDDs for delta updates commit fad630f Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Wed Feb 26 02:33:01 2014 +0000 more debugging. Seems like we have empty partitions. commit b788bfb Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Tue Feb 25 11:10:16 2014 +0000 added more debug logging. commit 75b2da7 Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Tue Feb 25 09:47:32 2014 +0000 Fixed pipeline. Now seems to work well. commit 7799b3e Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Mon Feb 24 22:31:47 2014 +0000 updated slaves and conf. commit 95e19ea Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Sun Feb 23 06:19:21 2014 +0000 Readadded prepost commit da93886 Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Sun Feb 23 06:13:24 2014 +0000 Stringify still not working. FML. commit 6a4e492 Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Sun Feb 23 03:34:00 2014 +0000 Added debugging. Issue with stringify. commit 99d2713 Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Sat Feb 22 04:40:45 2014 +0000 Added new conf commit 7a036cb Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Fri Feb 21 20:35:37 2014 -0800 Fixed compilation issues. commit d3bbfd0 Author: Dan Crankshaw <dscrankshaw@gmail.com> Date: Wed Feb 19 06:15:14 2014 +0000 Beginning of new pipeline, not compiled yet. commit 2983132 Author: root <root@ip-10-78-213-7.ec2.internal> Date: Wed Feb 19 01:23:15 2014 +0000 Initial vldb benchmark commmit. Added slaves file.
1 parent e09139d commit faa4073

26 files changed

+1175
-181
lines changed

core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.rdd.RDDCheckpointData
2929
import org.apache.spark.serializer.Serializer
3030
import org.apache.spark.storage._
3131
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
32+
import java.nio.ByteBuffer
3233

3334
private[spark] object ShuffleMapTask {
3435

@@ -168,7 +169,11 @@ private[spark] class ShuffleMapTask(
168169
var totalBytes = 0L
169170
var totalTime = 0L
170171
val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
171-
writer.commit()
172+
// writer.commit()
173+
val bytes = writer.commit()
174+
if (bytes != null) {
175+
blockManager.putBytes(writer.blockId, ByteBuffer.wrap(bytes), StorageLevel.MEMORY_ONLY_SER, tellMaster = false)
176+
}
172177
writer.close()
173178
val size = writer.fileSegment().length
174179
totalBytes += size

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private[spark] class BlockManager(
5757

5858
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
5959

60-
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
60+
private[storage] val memoryStore = new MemoryStore(this, maxMemory)
6161
private[storage] val diskStore = new DiskStore(this, diskBlockManager)
6262

6363
// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
@@ -293,7 +293,7 @@ private[spark] class BlockManager(
293293
* never deletes (recent) items.
294294
*/
295295
def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
296-
diskStore.getValues(blockId, serializer).orElse(
296+
memoryStore.getValues(blockId, serializer).orElse(
297297
sys.error("Block " + blockId + " not found on disk, though it should be"))
298298
}
299299

@@ -313,7 +313,7 @@ private[spark] class BlockManager(
313313
// As an optimization for map output fetches, if the block is for a shuffle, return it
314314
// without acquiring a lock; the disk store never deletes (recent) items so this should work
315315
if (blockId.isShuffle) {
316-
diskStore.getBytes(blockId) match {
316+
memoryStore.getBytes(blockId) match {
317317
case Some(bytes) =>
318318
Some(bytes)
319319
case None =>
@@ -831,7 +831,7 @@ private[spark] class BlockManager(
831831
if (info != null) info.synchronized {
832832
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
833833
val removedFromMemory = memoryStore.remove(blockId)
834-
val removedFromDisk = diskStore.remove(blockId)
834+
val removedFromDisk = false //diskStore.remove(blockId)
835835
if (!removedFromMemory && !removedFromDisk) {
836836
logWarning("Block " + blockId + " could not be removed as it was not found in either " +
837837
"the disk or memory store")

core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.storage
1919

20-
import java.io.{FileOutputStream, File, OutputStream}
20+
import java.io.{ByteArrayOutputStream, FileOutputStream, File, OutputStream}
2121
import java.nio.channels.FileChannel
2222

2323
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
@@ -44,7 +44,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
4444
* Flush the partial writes and commit them as a single atomic block. Return the
4545
* number of bytes written for this commit.
4646
*/
47-
def commit(): Long
47+
def commit(): Array[Byte]
4848

4949
/**
5050
* Reverts writes that haven't been flushed yet. Callers should invoke this function
@@ -106,7 +106,7 @@ private[spark] class DiskBlockObjectWriter(
106106
/** The file channel, used for repositioning / truncating the file. */
107107
private var channel: FileChannel = null
108108
private var bs: OutputStream = null
109-
private var fos: FileOutputStream = null
109+
private var fos: ByteArrayOutputStream = null
110110
private var ts: TimeTrackingOutputStream = null
111111
private var objOut: SerializationStream = null
112112
private val initialPosition = file.length()
@@ -115,9 +115,8 @@ private[spark] class DiskBlockObjectWriter(
115115
private var _timeWriting = 0L
116116

117117
override def open(): BlockObjectWriter = {
118-
fos = new FileOutputStream(file, true)
118+
fos = new ByteArrayOutputStream()
119119
ts = new TimeTrackingOutputStream(fos)
120-
channel = fos.getChannel()
121120
lastValidPosition = initialPosition
122121
bs = compressStream(new FastBufferedOutputStream(ts, bufferSize))
123122
objOut = serializer.newInstance().serializeStream(bs)
@@ -130,9 +129,6 @@ private[spark] class DiskBlockObjectWriter(
130129
if (syncWrites) {
131130
// Force outstanding writes to disk and track how long it takes
132131
objOut.flush()
133-
val start = System.nanoTime()
134-
fos.getFD.sync()
135-
_timeWriting += System.nanoTime() - start
136132
}
137133
objOut.close()
138134

@@ -149,18 +145,18 @@ private[spark] class DiskBlockObjectWriter(
149145

150146
override def isOpen: Boolean = objOut != null
151147

152-
override def commit(): Long = {
148+
override def commit(): Array[Byte] = {
153149
if (initialized) {
154150
// NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
155151
// serializer stream and the lower level stream.
156152
objOut.flush()
157153
bs.flush()
158154
val prevPos = lastValidPosition
159-
lastValidPosition = channel.position()
160-
lastValidPosition - prevPos
155+
lastValidPosition = fos.size()
156+
fos.toByteArray
161157
} else {
162158
// lastValidPosition is zero if stream is uninitialized
163-
lastValidPosition
159+
null
164160
}
165161
}
166162

@@ -170,7 +166,7 @@ private[spark] class DiskBlockObjectWriter(
170166
// truncate the file to the last valid position.
171167
objOut.flush()
172168
bs.flush()
173-
channel.truncate(lastValidPosition)
169+
throw new UnsupportedOperationException("Revert temporarily broken due to in memory shuffle code changes.")
174170
}
175171
}
176172

@@ -182,7 +178,7 @@ private[spark] class DiskBlockObjectWriter(
182178
}
183179

184180
override def fileSegment(): FileSegment = {
185-
new FileSegment(file, initialPosition, bytesWritten)
181+
new FileSegment(null, initialPosition, bytesWritten)
186182
}
187183

188184
// Only valid if called after close()

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.util.LinkedHashMap
2323
import scala.collection.mutable.ArrayBuffer
2424

2525
import org.apache.spark.util.{SizeEstimator, Utils}
26+
import org.apache.spark.serializer.Serializer
2627

2728
/**
2829
* Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as
@@ -119,6 +120,14 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
119120
}
120121
}
121122

123+
/**
124+
* A version of getValues that allows a custom serializer. This is used as part of the
125+
* shuffle short-circuit code.
126+
*/
127+
def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
128+
getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer))
129+
}
130+
122131
override def remove(blockId: BlockId): Boolean = {
123132
entries.synchronized {
124133
val entry = entries.remove(blockId)

core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,17 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
187187
}
188188
})
189189
}
190+
191+
def removeAllShuffleStuff() {
192+
for (state <- shuffleStates.values;
193+
group <- state.allFileGroups;
194+
(mapId, _) <- group.mapIdToIndex.iterator;
195+
reducerId <- 0 until group.files.length) {
196+
val blockId = new ShuffleBlockId(group.shuffleId, mapId, reducerId)
197+
blockManager.removeBlock(blockId, tellMaster = false)
198+
}
199+
shuffleStates.clear()
200+
}
190201
}
191202

192203
private[spark]
@@ -200,7 +211,7 @@ object ShuffleBlockManager {
200211
* Stores the absolute index of each mapId in the files of this group. For instance,
201212
* if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
202213
*/
203-
private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
214+
val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
204215

205216
/**
206217
* Stores consecutive offsets of blocks into each reducer file, ordered by position in the file.

graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.graphx
2020
/**
2121
* The direction of a directed edge relative to a vertex.
2222
*/
23-
class EdgeDirection private (private val name: String) extends Serializable {
23+
class EdgeDirection (private val name: String) extends Serializable {
2424
/**
2525
* Reverse the direction of an edge. An in becomes out,
2626
* out becomes in and both and either remain the same.

graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,12 @@ class EdgeRDD[@specialized ED: ClassTag](
4545
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
4646

4747
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
48-
firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context).next._2.iterator
48+
val partIter = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
49+
if (partIter.hasNext) {
50+
partIter.next._2.iterator
51+
} else {
52+
Iterator.empty
53+
}
4954
}
5055

5156
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
@@ -69,8 +74,12 @@ class EdgeRDD[@specialized ED: ClassTag](
6974
private[graphx] def mapEdgePartitions[ED2: ClassTag](
7075
f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = {
7176
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
72-
val (pid, ep) = iter.next()
73-
Iterator(Tuple2(pid, f(pid, ep)))
77+
if (iter.hasNext) {
78+
val (pid, ep) = iter.next()
79+
Iterator(Tuple2(pid, f(pid, ep)))
80+
} else {
81+
Iterator.empty
82+
}
7483
}, preservesPartitioning = true))
7584
}
7685

@@ -107,9 +116,13 @@ class EdgeRDD[@specialized ED: ClassTag](
107116
val ed3Tag = classTag[ED3]
108117
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
109118
(thisIter, otherIter) =>
110-
val (pid, thisEPart) = thisIter.next()
111-
val (_, otherEPart) = otherIter.next()
112-
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
119+
if (thisIter.hasNext && otherIter.hasNext) {
120+
val (pid, thisEPart) = thisIter.next()
121+
val (_, otherEPart) = otherIter.next()
122+
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
123+
} else {
124+
Iterator.empty
125+
}
113126
})
114127
}
115128

graphx/src/main/scala/org/apache/spark/graphx/Graph.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
335335
* }
336336
* }}}
337337
*/
338-
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
338+
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)], destructive: Boolean = false)
339339
(mapFunc: (VertexId, VD, Option[U]) => VD2)
340340
: Graph[VD2, ED]
341341

graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,15 @@ package org.apache.spark.graphx
2020
import com.esotericsoftware.kryo.Kryo
2121

2222
import org.apache.spark.graphx.impl._
23+
// import org.apache.spark.examples.graphx._
2324
import org.apache.spark.serializer.KryoRegistrator
2425
import org.apache.spark.util.collection.BitSet
2526
import org.apache.spark.util.BoundedPriorityQueue
27+
import scala.collection.mutable
28+
import org.apache.hadoop.io.{LongWritable, Text}
29+
import java.util.{HashSet => JHashSet, TreeSet => JTreeSet}
30+
// import org.apache.hadoop.conf.Configuration
31+
import org.apache.mahout.text.wikipedia._
2632

2733
/**
2834
* Registers GraphX classes with Kryo for improved performance.
@@ -41,6 +47,13 @@ class GraphKryoRegistrator extends KryoRegistrator {
4147
kryo.register(classOf[PartitionStrategy])
4248
kryo.register(classOf[BoundedPriorityQueue[Object]])
4349
kryo.register(classOf[EdgeDirection])
50+
kryo.register(classOf[mutable.HashSet[VertexId]])
51+
kryo.register(classOf[XmlInputFormat])
52+
kryo.register(classOf[LongWritable])
53+
kryo.register(classOf[Text])
54+
kryo.register(classOf[WikiArticle])
55+
// kryo.register(classOf[JHashSet[VertexId]])
56+
kryo.register(classOf[JTreeSet[VertexId]])
4457

4558
// This avoids a large number of hash table lookups.
4659
kryo.setReferences(false)

graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.graphx
1919

2020
import org.apache.spark.{Logging, SparkContext}
2121
import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
22+
import org.apache.spark.rdd.RDD
2223

2324
/**
2425
* Provides utilities for loading [[Graph]]s from files.
@@ -60,7 +61,7 @@ object GraphLoader extends Logging {
6061
val startTime = System.currentTimeMillis
6162

6263
// Parse the edge data table directly into edge partitions
63-
val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
64+
val edges = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
6465
val builder = new EdgePartitionBuilder[Int]
6566
iter.foreach { line =>
6667
if (!line.isEmpty && line(0) != '#') {
@@ -86,4 +87,20 @@ object GraphLoader extends Logging {
8687
GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1)
8788
} // end of edgeListFile
8889

90+
def loadVertices(sc: SparkContext, vertexPath: String, delimiter: String = "\\s+"): RDD[(VertexId, String)] = {
91+
92+
val vertices = sc.textFile(vertexPath, 128).mapPartitions( iter =>
93+
iter.filter(line => !line.isEmpty && line(0) != '#').map { line =>
94+
val lineArray = line.split(delimiter)
95+
if(lineArray.length < 2) {
96+
println("Invalid line: " + line)
97+
assert(false)
98+
}
99+
val id = lineArray(0).trim.toLong
100+
val attr = lineArray.slice(1,lineArray.length).mkString(" ")
101+
(id, attr)
102+
})
103+
vertices
104+
}
105+
89106
}

0 commit comments

Comments
 (0)