@@ -568,6 +568,63 @@ impl<R: ChunkReader> SerializedPageReader<R> {
568568 physical_type : meta. column_type ( ) ,
569569 } )
570570 }
571+
572+ /// Similar to `peek_next_page`, but returns the offset of the next page instead of the page metadata.
573+ /// Unlike page metadata, an offset can uniquely identify a page.
574+ ///
575+ /// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice.
576+ /// This function allows us to check if the next page is being cached or read previously.
577+ #[ cfg( test) ]
578+ fn peek_next_page_offset ( & mut self ) -> Result < Option < usize > > {
579+ match & mut self . state {
580+ SerializedPageReaderState :: Values {
581+ offset,
582+ remaining_bytes,
583+ next_page_header,
584+ } => {
585+ loop {
586+ if * remaining_bytes == 0 {
587+ return Ok ( None ) ;
588+ }
589+ return if let Some ( header) = next_page_header. as_ref ( ) {
590+ if let Ok ( _page_meta) = PageMetadata :: try_from ( & * * header) {
591+ Ok ( Some ( * offset) )
592+ } else {
593+ // For unknown page type (e.g., INDEX_PAGE), skip and read next.
594+ * next_page_header = None ;
595+ continue ;
596+ }
597+ } else {
598+ let mut read = self . reader . get_read ( * offset as u64 ) ?;
599+ let ( header_len, header) = read_page_header_len ( & mut read) ?;
600+ * offset += header_len;
601+ * remaining_bytes -= header_len;
602+ let page_meta = if let Ok ( _page_meta) = PageMetadata :: try_from ( & header) {
603+ Ok ( Some ( * offset) )
604+ } else {
605+ // For unknown page type (e.g., INDEX_PAGE), skip and read next.
606+ continue ;
607+ } ;
608+ * next_page_header = Some ( Box :: new ( header) ) ;
609+ page_meta
610+ } ;
611+ }
612+ }
613+ SerializedPageReaderState :: Pages {
614+ page_locations,
615+ dictionary_page,
616+ ..
617+ } => {
618+ if let Some ( page) = dictionary_page {
619+ Ok ( Some ( page. offset as usize ) )
620+ } else if let Some ( page) = page_locations. front ( ) {
621+ Ok ( Some ( page. offset as usize ) )
622+ } else {
623+ Ok ( None )
624+ }
625+ }
626+ }
627+ }
571628}
572629
573630impl < R : ChunkReader > Iterator for SerializedPageReader < R > {
@@ -802,6 +859,8 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
802859
803860#[ cfg( test) ]
804861mod tests {
862+ use std:: collections:: HashSet ;
863+
805864 use bytes:: Buf ;
806865
807866 use crate :: file:: properties:: { EnabledStatistics , WriterProperties } ;
@@ -1107,6 +1166,89 @@ mod tests {
11071166 assert_eq ! ( page_count, 2 ) ;
11081167 }
11091168
1169+ fn get_serialized_page_reader < R : ChunkReader > (
1170+ file_reader : & SerializedFileReader < R > ,
1171+ row_group : usize ,
1172+ column : usize ,
1173+ ) -> Result < SerializedPageReader < R > > {
1174+ let row_group = {
1175+ let row_group_metadata = file_reader. metadata . row_group ( row_group) ;
1176+ let props = Arc :: clone ( & file_reader. props ) ;
1177+ let f = Arc :: clone ( & file_reader. chunk_reader ) ;
1178+ SerializedRowGroupReader :: new (
1179+ f,
1180+ row_group_metadata,
1181+ file_reader
1182+ . metadata
1183+ . offset_index ( )
1184+ . map ( |x| x[ row_group] . as_slice ( ) ) ,
1185+ props,
1186+ ) ?
1187+ } ;
1188+
1189+ let col = row_group. metadata . column ( column) ;
1190+
1191+ let page_locations = row_group
1192+ . offset_index
1193+ . map ( |x| x[ column] . page_locations . clone ( ) ) ;
1194+
1195+ let props = Arc :: clone ( & row_group. props ) ;
1196+ SerializedPageReader :: new_with_properties (
1197+ Arc :: clone ( & row_group. chunk_reader ) ,
1198+ col,
1199+ row_group. metadata . num_rows ( ) as usize ,
1200+ page_locations,
1201+ props,
1202+ )
1203+ }
1204+
1205+ #[ test]
1206+ fn test_peek_next_page_offset_matches_actual ( ) -> Result < ( ) > {
1207+ let test_file = get_test_file ( "alltypes_plain.parquet" ) ;
1208+ let reader = SerializedFileReader :: new ( test_file) ?;
1209+
1210+ let mut offset_set = HashSet :: new ( ) ;
1211+ let num_row_groups = reader. metadata . num_row_groups ( ) ;
1212+ for row_group in 0 ..num_row_groups {
1213+ let num_columns = reader. metadata . row_group ( row_group) . num_columns ( ) ;
1214+ for column in 0 ..num_columns {
1215+ let mut page_reader = get_serialized_page_reader ( & reader, row_group, column) ?;
1216+
1217+ while let Ok ( Some ( page_offset) ) = page_reader. peek_next_page_offset ( ) {
1218+ match & page_reader. state {
1219+ SerializedPageReaderState :: Pages {
1220+ page_locations,
1221+ dictionary_page,
1222+ ..
1223+ } => {
1224+ if let Some ( page) = dictionary_page {
1225+ assert_eq ! ( page. offset as usize , page_offset) ;
1226+ } else if let Some ( page) = page_locations. front ( ) {
1227+ assert_eq ! ( page. offset as usize , page_offset) ;
1228+ } else {
1229+ unreachable ! ( )
1230+ }
1231+ }
1232+ SerializedPageReaderState :: Values {
1233+ offset,
1234+ next_page_header,
1235+ ..
1236+ } => {
1237+ assert ! ( next_page_header. is_some( ) ) ;
1238+ assert_eq ! ( * offset, page_offset) ;
1239+ }
1240+ }
1241+ let page = page_reader. get_next_page ( ) ?;
1242+ assert ! ( page. is_some( ) ) ;
1243+ let newly_inserted = offset_set. insert ( page_offset) ;
1244+ assert ! ( newly_inserted) ;
1245+ }
1246+ }
1247+ }
1248+
1249+ Ok ( ( ) )
1250+ }
1251+
11101252 #[ test]
11111253 fn test_page_iterator ( ) {
11121254 let file = get_test_file ( "alltypes_plain.parquet" ) ;
0 commit comments