@@ -215,12 +215,12 @@ protected void processPath(PathData item) throws IOException {
215215 }
216216
217217 protected class TextRecordInputStream extends InputStream {
218- SequenceFile .Reader r ;
218+ final SequenceFile .Reader r ;
219219 Object key ;
220220 Object val ;
221221
222- DataInputBuffer inbuf ;
223- DataOutputBuffer outbuf ;
222+ final DataInputBuffer inbuf ;
223+ final DataOutputBuffer outbuf ;
224224
225225 public TextRecordInputStream (FileStatus f ) throws IOException {
226226 final Path fpath = f .getPath ();
@@ -237,30 +237,67 @@ public TextRecordInputStream(FileStatus f) throws IOException {
237237 public int read () throws IOException {
238238 int ret ;
239239 if (null == inbuf || -1 == (ret = inbuf .read ())) {
240- key = r .next (key );
241- if (key == null ) {
242- return -1 ;
240+ if (!readNextFromSequenceFile ()) {
241+ ret = -1 ;
243242 } else {
244- val = r . getCurrentValue ( val );
243+ ret = inbuf . read ( );
245244 }
246- byte [] tmp = key .toString ().getBytes (StandardCharsets .UTF_8 );
247- outbuf .write (tmp , 0 , tmp .length );
248- outbuf .write ('\t' );
249- tmp = val .toString ().getBytes (StandardCharsets .UTF_8 );
250- outbuf .write (tmp , 0 , tmp .length );
251- outbuf .write ('\n' );
252- inbuf .reset (outbuf .getData (), outbuf .getLength ());
253- outbuf .reset ();
254- ret = inbuf .read ();
255245 }
256246 return ret ;
257247 }
258248
249+ @ Override
250+ public int read (byte [] dest , int destPos , int destLen ) throws IOException {
251+ validateInputStreamReadArguments (dest , destPos , destLen );
252+
253+ if (destLen == 0 ) {
254+ return 0 ;
255+ }
256+
257+ int bytesRead = 0 ;
258+ while (destLen > 0 ) {
259+ // Attempt to copy buffered data.
260+ int copyLen = inbuf .read (dest , destPos , destLen );
261+ if (-1 == copyLen ) {
262+ // There was no buffered data.
263+ if (!readNextFromSequenceFile ()) {
264+ // There is also no data remaining in the file.
265+ break ;
266+ }
267+ // Reattempt copy now that we have buffered data.
268+ copyLen = inbuf .read (dest , destPos , destLen );
269+ }
270+ bytesRead += copyLen ;
271+ destPos += copyLen ;
272+ destLen -= copyLen ;
273+ }
274+
275+ return bytesRead > 0 ? bytesRead : -1 ;
276+ }
277+
259278 @ Override
260279 public void close () throws IOException {
261280 r .close ();
262281 super .close ();
263282 }
283+
284+ private boolean readNextFromSequenceFile () throws IOException {
285+ key = r .next (key );
286+ if (key == null ) {
287+ return false ;
288+ } else {
289+ val = r .getCurrentValue (val );
290+ }
291+ byte [] tmp = key .toString ().getBytes (StandardCharsets .UTF_8 );
292+ outbuf .write (tmp , 0 , tmp .length );
293+ outbuf .write ('\t' );
294+ tmp = val .toString ().getBytes (StandardCharsets .UTF_8 );
295+ outbuf .write (tmp , 0 , tmp .length );
296+ outbuf .write ('\n' );
297+ inbuf .reset (outbuf .getData (), outbuf .getLength ());
298+ outbuf .reset ();
299+ return true ;
300+ }
264301 }
265302
266303 /**
@@ -270,10 +307,11 @@ public void close() throws IOException {
270307 protected static class AvroFileInputStream extends InputStream {
271308 private int pos ;
272309 private byte [] buffer ;
273- private ByteArrayOutputStream output ;
274- private FileReader <?> fileReader ;
275- private DatumWriter <Object > writer ;
276- private JsonEncoder encoder ;
310+ private final ByteArrayOutputStream output ;
311+ private final FileReader <?> fileReader ;
312+ private final DatumWriter <Object > writer ;
313+ private final JsonEncoder encoder ;
314+ private final byte [] finalSeparator ;
277315
278316 public AvroFileInputStream (FileStatus status ) throws IOException {
279317 pos = 0 ;
@@ -286,31 +324,96 @@ public AvroFileInputStream(FileStatus status) throws IOException {
286324 writer = new GenericDatumWriter <Object >(schema );
287325 output = new ByteArrayOutputStream ();
288326 encoder = EncoderFactory .get ().jsonEncoder (schema , output );
327+ finalSeparator = System .getProperty ("line.separator" ).getBytes (StandardCharsets .UTF_8 );
289328 }
290329
291330 /**
292331 * Read a single byte from the stream.
293332 */
294333 @ Override
295334 public int read () throws IOException {
335+ if (buffer == null ) {
336+ return -1 ;
337+ }
338+
296339 if (pos < buffer .length ) {
297340 return buffer [pos ++];
298341 }
342+
299343 if (!fileReader .hasNext ()) {
344+ // Unset buffer to signal EOF on future calls.
345+ buffer = null ;
300346 return -1 ;
301347 }
348+
302349 writer .write (fileReader .next (), encoder );
303350 encoder .flush ();
351+
304352 if (!fileReader .hasNext ()) {
305- // Write a new line after the last Avro record.
306- output .write (System .getProperty ("line.separator" )
307- .getBytes (StandardCharsets .UTF_8 ));
308- output .flush ();
353+ if (buffer .length > 0 ) {
354+ // Write a new line after the last Avro record.
355+ output .write (finalSeparator );
356+ output .flush ();
357+ }
358+ }
359+
360+ swapBuffer ();
361+ return read ();
362+ }
363+
364+ @ Override
365+ public int read (byte [] dest , int destPos , int destLen ) throws IOException {
366+ validateInputStreamReadArguments (dest , destPos , destLen );
367+
368+ if (destLen == 0 ) {
369+ return 0 ;
370+ }
371+
372+ if (buffer == null ) {
373+ return -1 ;
374+ }
375+
376+ int bytesRead = 0 ;
377+ while (destLen > 0 && buffer != null ) {
378+ if (pos < buffer .length ) {
379+ // We have buffered data available, either from the Avro file or the final separator.
380+ int copyLen = Math .min (buffer .length - pos , destLen );
381+ System .arraycopy (buffer , pos , dest , destPos , copyLen );
382+ pos += copyLen ;
383+ bytesRead += copyLen ;
384+ destPos += copyLen ;
385+ destLen -= copyLen ;
386+ } else if (buffer == finalSeparator ) {
387+ // There is no buffered data, and the last buffer processed was the final separator.
388+ // Unset buffer to signal EOF on future calls.
389+ buffer = null ;
390+ } else if (!fileReader .hasNext ()) {
391+ if (buffer .length > 0 ) {
392+ // There is no data remaining in the file. Get ready to write the final separator on
393+ // the next iteration.
394+ buffer = finalSeparator ;
395+ pos = 0 ;
396+ } else {
397+ // We never read data into the buffer. This must be an empty file.
398+ // Immediate EOF, no separator needed.
399+ buffer = null ;
400+ return -1 ;
401+ }
402+ } else {
403+ // Read the next data from the file into the buffer.
404+ writer .write (fileReader .next (), encoder );
405+ encoder .flush ();
406+ swapBuffer ();
407+ }
309408 }
409+
410+ return bytesRead ;
411+ }
412+
413+ private void swapBuffer () {
310414 pos = 0 ;
311415 buffer = output .toByteArray ();
312416 output .reset ();
313- return read ();
314417 }
315418
316419 /**
@@ -323,4 +426,14 @@ public void close() throws IOException {
323426 super .close ();
324427 }
325428 }
429+
430+ private static void validateInputStreamReadArguments (byte [] dest , int destPos , int destLen )
431+ throws IOException {
432+ if (dest == null ) {
433+ throw new NullPointerException ("null destination buffer" );
434+ } else if (destPos < 0 || destLen < 0 || destLen > dest .length - destPos ) {
435+ throw new IndexOutOfBoundsException (String .format (
436+ "invalid destination buffer range: destPos = %d, destLen = %d" , destPos , destLen ));
437+ }
438+ }
326439}
0 commit comments