55from io import BufferedWriter , BytesIO
66from typing import Any , List , Optional , Sequence , Tuple , Union
77
8- import pandas as pd
8+ import duckdb
9+ from duckdb import DuckDBPyConnection , DuckDBPyRelation
910
1011from countess import VERSION
1112from countess .core .parameters import (
1819 MultiParam ,
1920 StringParam ,
2021)
21- from countess .core .plugins import PandasInputFilesPlugin , PandasOutputPlugin
22+ from countess .core .plugins import DuckdbLoadFilePlugin , DuckdbSaveFilePlugin
2223from countess .utils .files import clean_filename
23- from countess .utils .pandas import flatten_columns
24+ from countess .utils .duckdb import duckdb_escape_literal , duckdb_escape_identifier
2425
2526CSV_FILE_TYPES : Sequence [Tuple [str , Union [str , List [str ]]]] = [
2627 ("CSV" , [".csv" , ".csv.gz" ]),
3435class ColumnsMultiParam (MultiParam ):
3536 name = StringParam ("Column Name" , "" )
3637 type = DataTypeOrNoneChoiceParam ("Column Type" )
37- index = BooleanParam ("Index?" , False )
3838
3939
40- class LoadCsvPlugin (PandasInputFilesPlugin ):
40+ CSV_DELIMITER_CHOICES = {
41+ ',' : ',' ,
42+ ';' : ';' ,
43+ '|' : '|' ,
44+ 'TAB' : '\t ' ,
45+ 'SPACE' : ' ' ,
46+ 'NONE' : None
47+ }
48+
49+ class LoadCsvPlugin (DuckdbLoadFilePlugin ):
4150 """Load CSV files"""
4251
4352 name = "CSV Load"
@@ -46,78 +55,42 @@ class LoadCsvPlugin(PandasInputFilesPlugin):
4655 version = VERSION
4756 file_types = CSV_FILE_TYPES
4857
49- delimiter = ChoiceParam ("Delimiter" , "," , choices = ["," , ";" , "TAB" , "|" , "WHITESPACE" ])
50- quoting = ChoiceParam ("Quoting" , "None" , choices = ["None" , "Double-Quote" , "Quote with Escape" ])
51- comment = ChoiceParam ("Comment" , "None" , choices = ["None" , "#" , ";" ])
58+ delimiter = ChoiceParam ("Delimiter" , "," , choices = CSV_DELIMITER_CHOICES .keys ())
5259 header = BooleanParam ("CSV file has header row?" , True )
5360 filename_column = StringParam ("Filename Column" , "" )
5461 columns = ArrayParam ("Columns" , ColumnsMultiParam ("Column" ))
5562
56- def read_file_to_dataframe (self , filename : str , file_param : BaseParam , row_limit = None ):
57- options : dict [str , Any ] = {
58- "header" : 0 if self .header else None ,
59- }
60- if row_limit is not None :
61- options ["nrows" ] = row_limit
62-
63- index_col_numbers = []
64-
65- if len (self .columns ):
66- options ["names" ] = []
67- options ["usecols" ] = []
68- options ["converters" ] = {}
69-
70- for n , pp in enumerate (self .columns ):
71- options ["names" ].append (str (pp .name ) or f"column_{ n } " )
72- if pp .type .is_not_none ():
73- if pp .index :
74- index_col_numbers .append (len (options ["usecols" ]))
75- options ["usecols" ].append (n )
76- options ["converters" ][n ] = pp ["type" ].cast_value
77-
78- if self .delimiter == "TAB" :
79- options ["delimiter" ] = "\t "
80- elif self .delimiter == "WHITESPACE" :
81- options ["delim_whitespace" ] = True
63+ def load_file (
64+ self , cursor : DuckDBPyConnection , filename : str , file_param : BaseParam , file_number : int
65+ ) -> duckdb .DuckDBPyRelation :
66+ if self .header and len (self .columns ) == 0 :
67+ table = cursor .read_csv (
68+ filename ,
69+ header = True ,
70+ delimiter = CSV_DELIMITER_CHOICES [self .delimiter .value ],
71+ )
72+ for column_name , column_dtype in zip (table .columns , table .dtypes ):
73+ column_param = self .columns .add_row ()
74+ column_param .name .value = column_name
75+ column_param .type .value = str (column_dtype )
8276 else :
83- options ["delimiter" ] = str (self .delimiter )
84-
85- if self .quoting == "None" :
86- options ["quoting" ] = csv .QUOTE_NONE
87- elif self .quoting == "Double-Quote" :
88- options ["quotechar" ] = '"'
89- options ["doublequote" ] = True
90- elif self .quoting == "Quote with Escape" :
91- options ["quotechar" ] = '"'
92- options ["doublequote" ] = False
93- options ["escapechar" ] = "\\ "
94-
95- if self .comment .value != "None" :
96- options ["comment" ] = str (self .comment )
97-
98- # XXX pd.read_csv(index_col=) is half the speed of pd.read_csv().set_index()
99-
100- df = pd .read_csv (filename , ** options )
101-
102- while len (df .columns ) > len (self .columns ):
103- self .columns .add_row ()
104-
105- if self .header :
106- for n , col in enumerate (df .columns ):
107- if not self .columns [n ].name :
108- self .columns [n ].name = str (col )
109- self .columns [n ].type = "string"
77+ table = cursor .read_csv (
78+ filename ,
79+ header = False ,
80+ skiprows = 1 if self .header else 0 ,
81+ delimiter = CSV_DELIMITER_CHOICES [self .delimiter .value ],
82+ columns = { str (c .name ): str (c .type ) for c in self .columns } if self .columns else None
83+ )
11084
11185 if self .filename_column :
112- df [str (self .filename_column )] = clean_filename (filename )
113-
114- if index_col_numbers :
115- df = df .set_index ([df .columns [n ] for n in index_col_numbers ])
86+ escaped_filename = duckdb_escape_literal (clean_filename (filename ))
87+ escaped_column = duckdb_escape_identifier (self .filename_column .value )
88+ table = table .project (f"*, { escaped_filename } AS { escaped_column } " )
11689
117- return df
90+ return table
11891
11992
120- class SaveCsvPlugin (PandasOutputPlugin ):
93+ class SaveCsvPlugin (DuckdbSaveFilePlugin ):
12194 name = "CSV Save"
12295 description = "Save data as CSV or similar delimited text files"
12396 link = "https://countess-project.github.io/CountESS/included-plugins/#csv-writer"
@@ -135,61 +108,5 @@ class SaveCsvPlugin(PandasOutputPlugin):
135108 SEPARATORS = {"," : "," , ";" : ";" , "SPACE" : " " , "TAB" : "\t " }
136109 QUOTING = {False : csv .QUOTE_MINIMAL , True : csv .QUOTE_NONNUMERIC }
137110
138- def prepare (self , sources : list [str ], row_limit : Optional [int ] = None ):
139- if row_limit is None :
140- logger .debug ("SaveCsvPlugin.process %s prepare %s" , self .name , self .filename )
141- filename = str (self .filename )
142- if filename .endswith (".gz" ):
143- self .filehandle = gzip .open (filename , "wb" )
144- elif filename .endswith (".bz2" ):
145- self .filehandle = bz2 .open (filename , "wb" )
146- else :
147- self .filehandle = open (filename , "wb" )
148- else :
149- logger .debug ("SaveCsvPlugin.process %s prepare BytesIO" , self .name )
150- self .filehandle = BytesIO ()
151-
152- self .csv_columns = None
153-
154- def process (self , data : pd .DataFrame , source : str ):
155- # reset indexes so we can treat all columns equally.
156- # if there's just a nameless index then we don't care about it, drop it.
157- drop_index = data .index .name is None and data .index .names [0 ] is None
158- dataframe = flatten_columns (data .reset_index (drop = drop_index ))
159-
160- # if this is our first dataframe to write then decide whether to
161- # include the header or not.
162- if self .csv_columns is None :
163- self .csv_columns = list (dataframe .columns )
164- emit_header = bool (self .header )
165- else :
166- # add in any columns we haven't seen yet in previous dataframes.
167- for c in dataframe .columns :
168- if c not in self .csv_columns :
169- self .csv_columns .append (c )
170- logger .warning ("Added CSV Column %s with no header" , repr (c ))
171- # fill in blanks for any columns which are in previous dataframes but not
172- # in this one.
173- dataframe = dataframe .assign (** {c : None for c in self .csv_columns if c not in dataframe .columns })
174- emit_header = False
175-
176- logger .debug (
177- "SaveCsvPlugin.process %s writing rows %d columns %d" , self .name , len (dataframe ), len (self .csv_columns )
178- )
179-
180- dataframe .to_csv (
181- self .filehandle ,
182- header = emit_header ,
183- columns = self .csv_columns ,
184- index = False ,
185- sep = self .SEPARATORS [str (self .delimiter )],
186- quoting = self .QUOTING [bool (self .quoting )],
187- ) # type: ignore [call-overload]
188- return []
189-
190- def finalize (self ):
191- logger .debug ("SaveCsvPlugin.process %s finalize" , self .name )
192- if isinstance (self .filehandle , BytesIO ):
193- yield self .filehandle .getvalue ().decode ("utf-8" )
194- else :
195- self .filehandle .close ()
111+ def execute (self , ddbc : DuckDBPyConnection , source : Optional [DuckDBPyRelation ]) -> Optional [DuckDBPyRelation ]:
112+ pass
0 commit comments