File tree Expand file tree Collapse file tree 1 file changed +9
-3
lines changed
core/src/main/scala/org/apache/spark/input Expand file tree Collapse file tree 1 file changed +9
-3
lines changed Original file line number Diff line number Diff line change @@ -61,16 +61,22 @@ abstract class StreamFileInputFormat[T]
61
61
*/
62
62
class PortableDataStream (split : CombineFileSplit , context : TaskAttemptContext , index : Integer )
63
63
extends Serializable {
64
- private var path = " "
64
+
65
65
private var fileIn : FSDataInputStream = null .asInstanceOf [FSDataInputStream ]
66
66
private var isOpen = false
67
+ /**
68
+ * Calculate the path name independently of opening the file
69
+ */
70
+ private lazy val path = {
71
+ val pathp = split.getPath(index)
72
+ path = pathp.toString
73
+ }
67
74
68
75
/**
69
76
* create a new DataInputStream from the split and context
70
77
*/
71
78
def open (): FSDataInputStream = {
72
79
val pathp = split.getPath(index)
73
- path = pathp.toString
74
80
val fs = pathp.getFileSystem(context.getConfiguration)
75
81
fileIn = fs.open(pathp)
76
82
isOpen= true
@@ -126,9 +132,9 @@ abstract class StreamBasedRecordReader[T](
126
132
override def nextKeyValue = {
127
133
if (! processed) {
128
134
val fileIn = new PortableDataStream (split,context,index)
129
- key = fileIn.getPath
130
135
value = parseStream(fileIn)
131
136
fileIn.close() // if it has not been open yet, close does nothing
137
+ key = fileIn.getPath
132
138
processed = true
133
139
true
134
140
} else {
You can’t perform that action at this time.
0 commit comments