@@ -1269,11 +1269,55 @@ abstract class RDD[T: ClassTag](
1269
1269
1270
1270
/** A description of this RDD and its recursive dependencies for debugging. */
1271
1271
def toDebugString : String = {
1272
- def debugString (rdd : RDD [_], prefix : String = " " ): Seq [String ] = {
1273
- Seq (prefix + rdd + " (" + rdd.partitions.size + " partitions)" ) ++
1274
- rdd.dependencies.flatMap(d => debugString(d.rdd, prefix + " " ))
1272
+ // Apply a different rule to the last child
1273
+ def debugChildren (rdd : RDD [_], prefix : String ): Seq [String ] = {
1274
+ val len = rdd.dependencies.length
1275
+ len match {
1276
+ case 0 => Seq .empty
1277
+ case 1 =>
1278
+ val d = rdd.dependencies.head
1279
+ debugString(d.rdd, prefix, d.isInstanceOf [ShuffleDependency [_,_,_]], true )
1280
+ case _ =>
1281
+ val frontDeps = rdd.dependencies.take(len - 1 )
1282
+ val frontDepStrings = frontDeps.flatMap(
1283
+ d => debugString(d.rdd, prefix, d.isInstanceOf [ShuffleDependency [_,_,_]]))
1284
+
1285
+ val lastDep = rdd.dependencies.last
1286
+ val lastDepStrings =
1287
+ debugString(lastDep.rdd, prefix, lastDep.isInstanceOf [ShuffleDependency [_,_,_]], true )
1288
+
1289
+ (frontDepStrings ++ lastDepStrings)
1290
+ }
1291
+ }
1292
+ // The first RDD in the dependency stack has no parents, so no need for a +-
1293
+ def firstDebugString (rdd : RDD [_]): Seq [String ] = {
1294
+ val partitionStr = " (" + rdd.partitions.size + " )"
1295
+ val leftOffset = (partitionStr.length - 1 ) / 2
1296
+ val nextPrefix = (" " * leftOffset) + " |" + (" " * (partitionStr.length - leftOffset))
1297
+ Seq (partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix)
1298
+ }
1299
+ def shuffleDebugString (rdd : RDD [_], prefix : String = " " , isLastChild : Boolean ): Seq [String ] = {
1300
+ val partitionStr = " (" + rdd.partitions.size + " )"
1301
+ val leftOffset = (partitionStr.length - 1 ) / 2
1302
+ val thisPrefix = prefix.replaceAll(" \\ |\\ s+$" , " " )
1303
+ val nextPrefix = (
1304
+ thisPrefix
1305
+ + (if (isLastChild) " " else " | " )
1306
+ + (" " * leftOffset) + " |" + (" " * (partitionStr.length - leftOffset)))
1307
+ Seq (thisPrefix + " +-" + partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix)
1308
+ }
1309
+ def debugString (rdd : RDD [_],
1310
+ prefix : String = " " ,
1311
+ isShuffle : Boolean = true ,
1312
+ isLastChild : Boolean = false ): Seq [String ] = {
1313
+ if (isShuffle) {
1314
+ shuffleDebugString(rdd, prefix, isLastChild)
1315
+ }
1316
+ else {
1317
+ Seq (prefix + rdd) ++ debugChildren(rdd, prefix)
1318
+ }
1275
1319
}
1276
- debugString (this ).mkString(" \n " )
1320
+ firstDebugString (this ).mkString(" \n " )
1277
1321
}
1278
1322
1279
1323
override def toString : String = " %s%s[%d] at %s" .format(
0 commit comments