@@ -51,6 +51,75 @@ private[spark] abstract class StreamFileInputFormat[T]
5151
5252}
5353
54+ /**
55+ * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader ]]
56+ * to reading files out as streams
57+ */
58+ private [spark] abstract class StreamBasedRecordReader [T ](
59+ split : CombineFileSplit ,
60+ context : TaskAttemptContext ,
61+ index : Integer )
62+ extends RecordReader [String , T ] {
63+
64+ // True means the current file has been processed, then skip it.
65+ private var processed = false
66+
67+ private var key = " "
68+ private var value : T = null .asInstanceOf [T ]
69+
70+ override def initialize (split : InputSplit , context : TaskAttemptContext ) = {}
71+ override def close () = {}
72+
73+ override def getProgress = if (processed) 1.0f else 0.0f
74+
75+ override def getCurrentKey = key
76+
77+ override def getCurrentValue = value
78+
79+ override def nextKeyValue = {
80+ if (! processed) {
81+ val fileIn = new PortableDataStream (split, context, index)
82+ value = parseStream(fileIn)
83+ fileIn.close() // if it has not been open yet, close does nothing
84+ key = fileIn.getPath
85+ processed = true
86+ true
87+ } else {
88+ false
89+ }
90+ }
91+
92+ /**
93+ * Parse the stream (and close it afterwards) and return the value as in type T
94+ * @param inStream the stream to be read in
95+ * @return the data formatted as
96+ */
97+ def parseStream (inStream : PortableDataStream ): T
98+ }
99+
100+ /**
101+ * Reads the record in directly as a stream for other objects to manipulate and handle
102+ */
103+ private [spark] class StreamRecordReader (
104+ split : CombineFileSplit ,
105+ context : TaskAttemptContext ,
106+ index : Integer )
107+ extends StreamBasedRecordReader [PortableDataStream ](split, context, index) {
108+
109+ def parseStream (inStream : PortableDataStream ): PortableDataStream = inStream
110+ }
111+
112+ /**
113+ * The format for the PortableDataStream files
114+ */
115+ private [spark] class StreamInputFormat extends StreamFileInputFormat [PortableDataStream ] {
116+ override def createRecordReader (split : InputSplit , taContext : TaskAttemptContext ) =
117+ {
118+ new CombineFileRecordReader [String , PortableDataStream ](
119+ split.asInstanceOf [CombineFileSplit ], taContext, classOf [StreamRecordReader ])
120+ }
121+ }
122+
54123/**
55124 * A class that allows DataStreams to be serialized and moved around by not creating them
56125 * until they need to be read
@@ -143,72 +212,3 @@ class PortableDataStream(@transient isplit: CombineFileSplit,
143212 def getPath (): String = path
144213}
145214
146- /**
147- * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader ]]
148- * to reading files out as streams
149- */
150- private [spark] abstract class StreamBasedRecordReader [T ](
151- split : CombineFileSplit ,
152- context : TaskAttemptContext ,
153- index : Integer )
154- extends RecordReader [String , T ] {
155-
156- // True means the current file has been processed, then skip it.
157- private var processed = false
158-
159- private var key = " "
160- private var value : T = null .asInstanceOf [T ]
161-
162- override def initialize (split : InputSplit , context : TaskAttemptContext ) = {}
163- override def close () = {}
164-
165- override def getProgress = if (processed) 1.0f else 0.0f
166-
167- override def getCurrentKey = key
168-
169- override def getCurrentValue = value
170-
171- override def nextKeyValue = {
172- if (! processed) {
173- val fileIn = new PortableDataStream (split, context, index)
174- value = parseStream(fileIn)
175- fileIn.close() // if it has not been open yet, close does nothing
176- key = fileIn.getPath
177- processed = true
178- true
179- } else {
180- false
181- }
182- }
183-
184- /**
185- * Parse the stream (and close it afterwards) and return the value as in type T
186- * @param inStream the stream to be read in
187- * @return the data formatted as
188- */
189- def parseStream (inStream : PortableDataStream ): T
190- }
191-
192- /**
193- * Reads the record in directly as a stream for other objects to manipulate and handle
194- */
195- private [spark] class StreamRecordReader (
196- split : CombineFileSplit ,
197- context : TaskAttemptContext ,
198- index : Integer )
199- extends StreamBasedRecordReader [PortableDataStream ](split, context, index) {
200-
201- def parseStream (inStream : PortableDataStream ): PortableDataStream = inStream
202- }
203-
204- /**
205- * The format for the PortableDataStream files
206- */
207- private [spark] class StreamInputFormat extends StreamFileInputFormat [PortableDataStream ] {
208- override def createRecordReader (split : InputSplit , taContext : TaskAttemptContext ) =
209- {
210- new CombineFileRecordReader [String , PortableDataStream ](
211- split.asInstanceOf [CombineFileSplit ], taContext, classOf [StreamRecordReader ])
212- }
213- }
214-
0 commit comments