@@ -4,13 +4,13 @@ use std::{io::Read, sync::Arc};
44use arrow:: array:: * ;
55use arrow:: buffer:: Buffer ;
66use arrow:: datatypes:: { DataType , DateUnit , Field , IntervalUnit , Schema , TimeUnit , ToByteSlice } ;
7- use arrow:: record_batch:: RecordBatch ;
7+ use arrow:: record_batch:: { RecordBatch , RecordBatchReader } ;
88use byteorder:: { LittleEndian , NetworkEndian , ReadBytesExt } ;
99use chrono:: Timelike ;
1010use postgres:: types:: * ;
1111use postgres:: { Client , NoTls , Row } ;
1212
13- use super :: { Postgres , EPOCH_DAYS , EPOCH_MICROS , MAGIC } ;
13+ use super :: { Postgres , PostgresReadIterator , EPOCH_DAYS , EPOCH_MICROS , MAGIC } ;
1414use crate :: error:: DataFrameError ;
1515use crate :: io:: sql:: SqlDataSource ;
1616
@@ -112,6 +112,88 @@ impl SqlDataSource for Postgres {
112112 }
113113}
114114
115+ impl PostgresReadIterator {
116+ /// Create a new Postgres reader
117+ pub fn try_new (
118+ connection : & str ,
119+ query : & str ,
120+ limit : Option < usize > ,
121+ batch_size : usize ,
122+ ) -> crate :: error:: Result < Self > {
123+ let mut client = Client :: connect ( connection, NoTls ) . unwrap ( ) ;
124+ // get schema
125+ let row = client
126+ . query_one (
127+ format ! ( "select a.* from ({}) a limit 1" , query) . as_str ( ) ,
128+ & [ ] ,
129+ )
130+ . unwrap ( ) ;
131+ let schema = row_to_schema ( & row) . expect ( "Unable to get schema from row" ) ;
132+ Ok ( Self {
133+ client,
134+ query : query. to_string ( ) ,
135+ limit : limit. unwrap_or_else ( || std:: usize:: MAX ) ,
136+ batch_size,
137+ schema,
138+ read_records : 0 ,
139+ is_complete : false ,
140+ } )
141+ }
142+
143+ /// Read the next batch
144+ fn read_batch ( & mut self ) -> crate :: error:: Result < Option < RecordBatch > > {
145+ if self . is_complete {
146+ return Ok ( None ) ;
147+ }
148+ let reader = get_binary_reader (
149+ & mut self . client ,
150+ format ! (
151+ "select a.* from ({}) a limit {} offset {}" ,
152+ self . query, self . batch_size, self . read_records
153+ )
154+ . as_str ( ) ,
155+ ) ?;
156+ let batch = read_from_binary ( reader, & self . schema ) ?;
157+ println ! (
158+ "Read {} records from offset {}" ,
159+ batch. num_rows( ) ,
160+ self . read_records
161+ ) ;
162+ self . read_records += batch. num_rows ( ) ;
163+ if batch. num_rows ( ) == 0 {
164+ self . is_complete = true ;
165+ return Ok ( None ) ;
166+ } else if self . read_records >= self . limit {
167+ self . is_complete = true ;
168+ return Ok ( Some ( batch) ) ;
169+ }
170+ Ok ( Some ( batch) )
171+ }
172+
173+ /// Get a reference to the table's schema
174+ pub fn schema ( & self ) -> & Schema {
175+ & self . schema
176+ }
177+ }
178+
179+ impl RecordBatchReader for PostgresReadIterator {
180+ fn schema ( & mut self ) -> arrow:: datatypes:: SchemaRef {
181+ Arc :: new ( self . schema . clone ( ) )
182+ }
183+ fn next_batch ( & mut self ) -> arrow:: error:: Result < Option < RecordBatch > > {
184+ self . read_batch ( ) . map_err ( |_| {
185+ arrow:: error:: ArrowError :: IoError ( "Unable to read record batch" . to_string ( ) )
186+ } )
187+ }
188+ }
189+
190+ impl Iterator for PostgresReadIterator {
191+ type Item = crate :: error:: Result < RecordBatch > ;
192+ fn next ( & mut self ) -> Option < Self :: Item > {
193+ self . read_batch ( ) . transpose ( )
194+ }
195+ }
196+
115197fn get_binary_reader < ' a > (
116198 client : & ' a mut Client ,
117199 query : & str ,
0 commit comments