1414 * See the License for the specific language governing permissions and
1515 * limitations under the License.
1616 */
17-
1817package org .apache .arrow .adapter .avro ;
1918
19+ import java .io .*;
20+ import java .nio .ByteBuffer ;
21+ import java .nio .charset .StandardCharsets ;
22+ import java .util .ArrayList ;
23+ import java .util .List ;
24+ import java .util .Set ;
2025import org .apache .arrow .adapter .avro .consumers .CompositeAvroConsumer ;
2126import org .apache .arrow .memory .BufferAllocator ;
2227import org .apache .arrow .vector .FieldVector ;
2934import org .apache .avro .io .BinaryDecoder ;
3035import org .apache .avro .io .DecoderFactory ;
3136
32- import java .io .*;
33- import java .nio .ByteBuffer ;
34- import java .nio .charset .StandardCharsets ;
35- import java .util .ArrayList ;
36- import java .util .List ;
37- import java .util .Set ;
38-
39-
4037class AvroFileReader implements DictionaryProvider {
4138
4239 // Writer owns a channel / decoder and will close them
@@ -69,20 +66,18 @@ class AvroFileReader implements DictionaryProvider {
6966
7067 // Create a new AvroFileReader for the input stream
7168 // In order to support non-blocking mode, the stream must support mark / reset
72- public AvroFileReader (
73- InputStream stream ,
74- BufferAllocator allocator ,
75- boolean blocking ) {
69+ public AvroFileReader (InputStream stream , BufferAllocator allocator , boolean blocking ) {
7670
77- this .stream =stream ;
71+ this .stream = stream ;
7872 this .allocator = allocator ;
7973 this .blocking = blocking ;
8074
8175 if (blocking ) {
8276 this .decoder = DecoderFactory .get ().binaryDecoder (stream , null );
8377 } else {
8478 if (!stream .markSupported ()) {
85- throw new IllegalArgumentException ("Input stream must support mark/reset for non-blocking mode" );
79+ throw new IllegalArgumentException (
80+ "Input stream must support mark/reset for non-blocking mode" );
8681 }
8782 this .decoder = DecoderFactory .get ().directBinaryDecoder (stream , null );
8883 }
@@ -107,9 +102,8 @@ void readHeader() throws IOException {
107102 headerSize += magic .length ;
108103
109104 // Validate Avro magic
110- int validateMagic = BinaryData .compareBytes (
111- AVRO_MAGIC , 0 , AVRO_MAGIC .length ,
112- magic , 0 , AVRO_MAGIC .length );
105+ int validateMagic =
106+ BinaryData .compareBytes (AVRO_MAGIC , 0 , AVRO_MAGIC .length , magic , 0 , AVRO_MAGIC .length );
113107
114108 if (validateMagic != 0 ) {
115109 throw new RuntimeException ("Invalid AVRO data file: The file is not an Avro file" );
@@ -126,7 +120,7 @@ void readHeader() throws IOException {
126120 ByteBuffer valueBuffer = decoder .readBytes (null );
127121
128122 headerSize += zigzagSize (keyBuffer .remaining ()) + keyBuffer .remaining ();
129- headerSize += zigzagSize (valueBuffer .remaining ()) + valueBuffer .remaining ();
123+ headerSize += zigzagSize (valueBuffer .remaining ()) + valueBuffer .remaining ();
130124
131125 String key = new String (keyBuffer .array (), StandardCharsets .UTF_8 );
132126
@@ -186,8 +180,7 @@ private String processCodec(ByteBuffer buffer) {
186180
187181 if (buffer != null && buffer .remaining () > 0 ) {
188182 return new String (buffer .array (), StandardCharsets .UTF_8 );
189- }
190- else {
183+ } else {
191184 return DataFileConstants .NULL_CODEC ;
192185 }
193186 }
@@ -242,22 +235,24 @@ boolean readBatch() throws IOException {
242235 decoder .readFixed (batchSyncMarker );
243236
244237 long batchSize =
245- zigzagSize (nRows ) +
246- zigzagSize (batchBuffer .remaining ()) +
247- batchBuffer .remaining () +
248- SYNC_MARKER_SIZE ;
238+ zigzagSize (nRows )
239+ + zigzagSize (batchBuffer .remaining ())
240+ + batchBuffer .remaining ()
241+ + SYNC_MARKER_SIZE ;
249242
250243 // Validate sync marker - mismatch indicates a corrupt file
251- int validateMarker = BinaryData . compareBytes (
252- syncMarker , 0 , SYNC_MARKER_SIZE ,
253- batchSyncMarker , 0 , SYNC_MARKER_SIZE );
244+ int validateMarker =
245+ BinaryData . compareBytes (
246+ syncMarker , 0 , SYNC_MARKER_SIZE , batchSyncMarker , 0 , SYNC_MARKER_SIZE );
254247
255248 if (validateMarker != 0 ) {
256249 throw new RuntimeException ("Invalid AVRO data file: The file is corrupted" );
257250 }
258251
259252 // Reset producers
260- recordConsumer .getConsumers ().forEach (consumer -> ensureCapacity (consumer .getVector (), (int ) nRows ));
253+ recordConsumer
254+ .getConsumers ()
255+ .forEach (consumer -> ensureCapacity (consumer .getVector (), (int ) nRows ));
261256 recordConsumer .getConsumers ().forEach (consumer -> consumer .setPosition (0 ));
262257
263258 // Decompress the batch buffer using Avro's codecs
@@ -299,7 +294,7 @@ boolean hasNextBatch() throws IOException {
299294 }
300295
301296 if (blocking ) {
302- return ! decoder .isEnd ();
297+ return !decoder .isEnd ();
303298 }
304299
305300 var in = decoder .inputStream ();
@@ -311,8 +306,7 @@ boolean hasNextBatch() throws IOException {
311306 in .reset ();
312307
313308 return nextByte >= 0 ;
314- }
315- catch (EOFException e ) {
309+ } catch (EOFException e ) {
316310 return false ;
317311 }
318312 }
0 commit comments