@@ -1092,7 +1092,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
1092
1092
val committer = format.getOutputCommitter(hadoopContext)
1093
1093
committer.setupTask(hadoopContext)
1094
1094
1095
- val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)
1095
+ val outputMetricsAndBytesWrittenCallback : Option [(OutputMetrics , () => Long )] =
1096
+ initHadoopOutputMetrics(context)
1096
1097
1097
1098
val writer = format.getRecordWriter(hadoopContext).asInstanceOf [NewRecordWriter [K , V ]]
1098
1099
require(writer != null , " Unable to obtain RecordWriter" )
@@ -1103,15 +1104,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
1103
1104
writer.write(pair._1, pair._2)
1104
1105
1105
1106
// Update bytes written metric every few records
1106
- maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics , recordsWritten)
1107
+ maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback , recordsWritten)
1107
1108
recordsWritten += 1
1108
1109
}
1109
1110
} {
1110
1111
writer.close(hadoopContext)
1111
1112
}
1112
1113
committer.commitTask(hadoopContext)
1113
- outputMetrics .foreach { om =>
1114
- bytesWrittenCallback.foreach { fn => om.setBytesWritten(fn ()) }
1114
+ outputMetricsAndBytesWrittenCallback .foreach { case (om, callback) =>
1115
+ om.setBytesWritten(callback ())
1115
1116
om.setRecordsWritten(recordsWritten)
1116
1117
}
1117
1118
1
@@ -1179,7 +1180,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
1179
1180
// around by taking a mod. We expect that no task will be attempted 2 billion times.
1180
1181
val taskAttemptId = (context.taskAttemptId % Int .MaxValue ).toInt
1181
1182
1182
- val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)
1183
+ val outputMetricsAndBytesWrittenCallback : Option [(OutputMetrics , () => Long )] =
1184
+ initHadoopOutputMetrics(context)
1183
1185
1184
1186
writer.setup(context.stageId, context.partitionId, taskAttemptId)
1185
1187
writer.open()
@@ -1191,15 +1193,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
1191
1193
writer.write(record._1.asInstanceOf [AnyRef ], record._2.asInstanceOf [AnyRef ])
1192
1194
1193
1195
// Update bytes written metric every few records
1194
- maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics , recordsWritten)
1196
+ maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback , recordsWritten)
1195
1197
recordsWritten += 1
1196
1198
}
1197
1199
} {
1198
1200
writer.close()
1199
1201
}
1200
1202
writer.commit()
1201
- outputMetrics .foreach { om =>
1202
- bytesWrittenCallback.foreach { fn => om.setBytesWritten(fn ()) }
1203
+ outputMetricsAndBytesWrittenCallback .foreach { case (om, callback) =>
1204
+ om.setBytesWritten(callback ())
1203
1205
om.setRecordsWritten(recordsWritten)
1204
1206
}
1205
1207
}
@@ -1211,25 +1213,21 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
1211
1213
// TODO: these don't seem like the right abstractions.
1212
1214
// We should abstract the duplicate code in a less awkward way.
1213
1215
1216
+ // return type: (output metrics, bytes written callback), defined only if the latter is defined
1214
1217
private def initHadoopOutputMetrics (
1215
- context : TaskContext ): ( Option [OutputMetrics ], Option [ () => Long ]) = {
1218
+ context : TaskContext ): Option [( OutputMetrics , () => Long )] = {
1216
1219
val bytesWrittenCallback = SparkHadoopUtil .get.getFSBytesWrittenOnThreadCallback()
1217
- val outputMetrics =
1218
- if (bytesWrittenCallback.isDefined) {
1219
- Some (context.taskMetrics().registerOutputMetrics(DataWriteMethod .Hadoop ))
1220
- } else {
1221
- None
1222
- }
1223
- (outputMetrics, bytesWrittenCallback)
1220
+ bytesWrittenCallback.map { b =>
1221
+ (context.taskMetrics().registerOutputMetrics(DataWriteMethod .Hadoop ), b)
1222
+ }
1224
1223
}
1225
1224
1226
1225
private def maybeUpdateOutputMetrics (
1227
- bytesWrittenCallback : Option [() => Long ],
1228
- outputMetrics : Option [OutputMetrics ],
1226
+ outputMetricsAndBytesWrittenCallback : Option [(OutputMetrics , () => Long )],
1229
1227
recordsWritten : Long ): Unit = {
1230
- outputMetrics.foreach { om =>
1231
- if (recordsWritten % PairRDDFunctions . RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0 ) {
1232
- bytesWrittenCallback.foreach { fn => om.setBytesWritten(fn ()) }
1228
+ if (recordsWritten % PairRDDFunctions . RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0 ) {
1229
+ outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
1230
+ om.setBytesWritten(callback ())
1233
1231
om.setRecordsWritten(recordsWritten)
1234
1232
}
1235
1233
}
0 commit comments