@@ -114,19 +114,26 @@ impl ListingTableConfig {
114114 }
115115 }
116116
117- fn infer_file_extension ( path : & str ) -> Result < String > {
117+ ///Returns a tupe of (file_extension, optional compression_extension)
118+ ///
119+ /// For example a path ending with blah.test.csv.gz returns `("csv", Some("gz"))`
120+ /// For example a path ending with blah.test.csv returns `("csv", None)`
121+ fn infer_file_extension_and_compression_type (
122+ path : & str ,
123+ ) -> Result < ( String , Option < String > ) > {
118124 let mut exts = path. rsplit ( '.' ) ;
119125
120- let mut splitted = exts. next ( ) . unwrap_or ( "" ) ;
126+ let splitted = exts. next ( ) . unwrap_or ( "" ) ;
121127
122128 let file_compression_type = FileCompressionType :: from_str ( splitted)
123129 . unwrap_or ( FileCompressionType :: UNCOMPRESSED ) ;
124130
125131 if file_compression_type. is_compressed ( ) {
126- splitted = exts. next ( ) . unwrap_or ( "" ) ;
132+ let splitted2 = exts. next ( ) . unwrap_or ( "" ) ;
133+ Ok ( ( splitted2. to_string ( ) , Some ( splitted. to_string ( ) ) ) )
134+ } else {
135+ Ok ( ( splitted. to_string ( ) , None ) )
127136 }
128-
129- Ok ( splitted. to_string ( ) )
130137 }
131138
132139 /// Infer `ListingOptions` based on `table_path` suffix.
@@ -147,18 +154,33 @@ impl ListingTableConfig {
147154 . await
148155 . ok_or_else ( || DataFusionError :: Internal ( "No files for table" . into ( ) ) ) ??;
149156
150- let file_extension =
151- ListingTableConfig :: infer_file_extension ( file. location . as_ref ( ) ) ?;
157+ let ( file_extension, maybe_compression_type) =
158+ ListingTableConfig :: infer_file_extension_and_compression_type (
159+ file. location . as_ref ( ) ,
160+ ) ?;
161+
162+ let mut format_options = HashMap :: new ( ) ;
163+ if let Some ( ref compression_type) = maybe_compression_type {
164+ format_options
165+ . insert ( "format.compression" . to_string ( ) , compression_type. clone ( ) ) ;
166+ }
152167
153168 let file_format = state
154169 . get_file_format_factory ( & file_extension)
155170 . ok_or ( config_datafusion_err ! (
156171 "No file_format found with extension {file_extension}"
157172 ) ) ?
158- . create ( state, & HashMap :: new ( ) ) ?;
173+ . create ( state, & format_options) ?;
174+
175+ let listing_file_extension =
176+ if let Some ( compression_type) = maybe_compression_type {
177+ format ! ( "{}.{}" , & file_extension, & compression_type)
178+ } else {
179+ file_extension
180+ } ;
159181
160182 let listing_options = ListingOptions :: new ( file_format)
161- . with_file_extension ( file_extension )
183+ . with_file_extension ( listing_file_extension )
162184 . with_target_partitions ( state. config ( ) . target_partitions ( ) ) ;
163185
164186 Ok ( Self {
@@ -2194,4 +2216,23 @@ mod tests {
21942216
21952217 Ok ( ( ) )
21962218 }
2219+
2220+ #[ tokio:: test]
2221+ async fn test_infer_options_compressed_csv ( ) -> Result < ( ) > {
2222+ let testdata = crate :: test_util:: arrow_test_data ( ) ;
2223+ let filename = format ! ( "{}/csv/aggregate_test_100.csv.gz" , testdata) ;
2224+ let table_path = ListingTableUrl :: parse ( filename) . unwrap ( ) ;
2225+
2226+ let ctx = SessionContext :: new ( ) ;
2227+
2228+ let config = ListingTableConfig :: new ( table_path) ;
2229+ let config_with_opts = config. infer_options ( & ctx. state ( ) ) . await ?;
2230+ let config_with_schema = config_with_opts. infer_schema ( & ctx. state ( ) ) . await ?;
2231+
2232+ let schema = config_with_schema. file_schema . unwrap ( ) ;
2233+
2234+ assert_eq ! ( schema. fields. len( ) , 13 ) ;
2235+
2236+ Ok ( ( ) )
2237+ }
21972238}
0 commit comments