1818 */
1919package org .apache .iceberg .parquet ;
2020
21+ import static org .apache .parquet .avro .AvroReadSupport .AVRO_REQUESTED_PROJECTION ;
22+
2123import java .util .Map ;
2224import java .util .Set ;
2325import org .apache .hadoop .conf .Configuration ;
2729import org .apache .iceberg .relocated .com .google .common .collect .ImmutableMap ;
2830import org .apache .iceberg .relocated .com .google .common .collect .Sets ;
2931import org .apache .parquet .avro .AvroReadSupport ;
32+ import org .apache .parquet .conf .ParquetConfiguration ;
3033import org .apache .parquet .hadoop .api .InitContext ;
3134import org .apache .parquet .hadoop .api .ReadSupport ;
3235import org .apache .parquet .io .api .RecordMaterializer ;
@@ -104,6 +107,60 @@ public ReadContext init(
104107 projection , context != null ? context .getReadSupportMetadata () : ImmutableMap .of ());
105108 }
106109
110+ @ Override
111+ @ SuppressWarnings ("deprecation" )
112+ public ReadContext init (
113+ ParquetConfiguration configuration ,
114+ Map <String , String > keyValueMetaData ,
115+ MessageType fileSchema ) {
116+ // Columns are selected from the Parquet file by taking the read context's message type and
117+ // matching to the file's columns by full path, so this must select columns by using the path
118+ // in the file's schema.
119+
120+ MessageType projection ;
121+ if (ParquetSchemaUtil .hasIds (fileSchema )) {
122+ projection = ParquetSchemaUtil .pruneColumns (fileSchema , expectedSchema );
123+ } else if (nameMapping != null ) {
124+ MessageType typeWithIds = ParquetSchemaUtil .applyNameMapping (fileSchema , nameMapping );
125+ projection = ParquetSchemaUtil .pruneColumns (typeWithIds , expectedSchema );
126+ } else {
127+ projection = ParquetSchemaUtil .pruneColumnsFallback (fileSchema , expectedSchema );
128+ }
129+
130+ // override some known backward-compatibility options
131+ configuration .set ("parquet.strict.typing" , "false" );
132+ configuration .set ("parquet.avro.add-list-element-records" , "false" );
133+ configuration .set ("parquet.avro.write-old-list-structure" , "false" );
134+
135+ // set Avro schemas in case the reader is Avro
136+ configuration .set (
137+ AVRO_REQUESTED_PROJECTION ,
138+ AvroSchemaUtil .convert (expectedSchema , projection .getName ()).toString ());
139+ org .apache .avro .Schema avroReadSchema =
140+ AvroSchemaUtil .buildAvroProjection (
141+ AvroSchemaUtil .convert (ParquetSchemaUtil .convert (projection ), projection .getName ()),
142+ expectedSchema ,
143+ ImmutableMap .of ());
144+ configuration .set (
145+ "parquet.avro.read.schema" , ParquetAvro .parquetAvroSchema (avroReadSchema ).toString ());
146+
147+ // let the context set up read support metadata, but always use the correct projection
148+ ReadContext context = null ;
149+ if (callInit ) {
150+ try {
151+ context = wrapped .init (configuration , keyValueMetaData , projection );
152+ } catch (UnsupportedOperationException e ) {
153+ // try the InitContext version
154+ context =
155+ wrapped .init (
156+ new InitContext (configuration , makeMultimap (keyValueMetaData ), projection ));
157+ }
158+ }
159+
160+ return new ReadContext (
161+ projection , context != null ? context .getReadSupportMetadata () : ImmutableMap .of ());
162+ }
163+
107164 @ Override
108165 public RecordMaterializer <T > prepareForRead (
109166 Configuration configuration ,
@@ -119,6 +176,21 @@ public RecordMaterializer<T> prepareForRead(
119176 return wrapped .prepareForRead (configuration , fileMetadata , readSchema , readContext );
120177 }
121178
179+ @ Override
180+ public RecordMaterializer <T > prepareForRead (
181+ ParquetConfiguration configuration ,
182+ Map <String , String > fileMetadata ,
183+ MessageType fileMessageType ,
184+ ReadContext readContext ) {
185+ // This is the type created in init that was based on the file's schema. The schema that this
186+ // will pass to the wrapped ReadSupport needs to match the expected schema's names. Rather than
187+ // renaming the file's schema, convert the expected schema to Parquet. This relies on writing
188+ // files with the correct schema.
189+ // TODO: this breaks when columns are reordered.
190+ MessageType readSchema = ParquetSchemaUtil .convert (expectedSchema , fileMessageType .getName ());
191+ return wrapped .prepareForRead (configuration , fileMetadata , readSchema , readContext );
192+ }
193+
122194 private Map <String , Set <String >> makeMultimap (Map <String , String > map ) {
123195 ImmutableMap .Builder <String , Set <String >> builder = ImmutableMap .builder ();
124196 for (Map .Entry <String , String > entry : map .entrySet ()) {
0 commit comments