Skip to content

Commit b6e852d

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-7470][CARMEL-6471] Upload API sometimes throw Delegation Token error (apache#146)
1 parent e66717c commit b6e852d

File tree

1 file changed

+37
-5
lines changed

1 file changed

+37
-5
lines changed

core/src/main/scala/org/apache/spark/input/PortableDataStream.scala

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,29 @@
1717

1818
package org.apache.spark.input
1919

20-
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
20+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, IOException}
21+
import java.util
2122

2223
import scala.collection.JavaConverters._
2324

25+
import com.google.common.collect.Lists
2426
import com.google.common.io.{ByteStreams, Closeables}
2527
import org.apache.hadoop.conf.Configuration
26-
import org.apache.hadoop.fs.Path
28+
import org.apache.hadoop.fs.{FileStatus, Path, PathFilter}
29+
import org.apache.hadoop.mapred.LocatedFileStatusFetcher
2730
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
28-
import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit}
31+
import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit, FileInputFormat}
2932

3033
import org.apache.spark.SparkContext
3134
import org.apache.spark.annotation.Since
32-
import org.apache.spark.internal.config
35+
import org.apache.spark.internal.{config, Logging}
3336

3437
/**
3538
* A general format for reading whole files in as streams, byte arrays,
3639
* or other functions to be added
3740
*/
3841
private[spark] abstract class StreamFileInputFormat[T]
39-
extends CombineFileInputFormat[String, T]
42+
extends CombineFileInputFormat[String, T] with Logging
4043
{
4144
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
4245

@@ -69,6 +72,35 @@ private[spark] abstract class StreamFileInputFormat[T]
6972

7073
def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): RecordReader[String, T]
7174

75+
override protected def listStatus(job: JobContext): util.List[FileStatus] = {
76+
val dirs: Array[Path] = FileInputFormat.getInputPaths(job)
77+
if (dirs.length == 0) throw new IOException("No input paths specified in job")
78+
// Whether we need to recursive look into the directory structure
79+
val recursive = FileInputFormat.getInputDirRecursive(job)
80+
val jobFilter: PathFilter = FileInputFormat.getInputPathFilter(job)
81+
val inputFilter = new PathFilter {
82+
override def accept(path: Path): Boolean = {
83+
val name = path.getName
84+
val hidden = name.startsWith("_") || name.startsWith(".")
85+
!hidden && (
86+
if (jobFilter != null) {
87+
jobFilter.accept(path)
88+
} else {
89+
true
90+
})
91+
}
92+
}
93+
94+
var result: util.List[FileStatus] = null
95+
96+
val locatedFileStatusFetcher =
97+
new LocatedFileStatusFetcher(job.getConfiguration, dirs, recursive, inputFilter, true)
98+
val locatedFiles = locatedFileStatusFetcher.getFileStatuses()
99+
result = Lists.newArrayList(locatedFiles)
100+
101+
logInfo("Total input paths to process : " + result.size)
102+
result
103+
}
72104
}
73105

74106
/**

0 commit comments

Comments
 (0)