1919import org .apache .commons .csv .CSVParser ;
2020import org .apache .commons .csv .CSVRecord ;
2121import org .apache .commons .lang3 .time .StopWatch ;
22- import org .slf4j .Logger ;
23- import org .slf4j .LoggerFactory ;
2422
23+ import lombok .extern .slf4j .Slf4j ;
2524import reactor .core .publisher .Flux ;
2625import reactor .core .scheduler .Schedulers ;
2726
27+ @ Slf4j
2828public abstract class EbirdCsvParser
2929{
30- private static final Logger logger = LoggerFactory .getLogger (EbirdCsvParser .class );
31-
3230 public enum ParseMode {SINGLE_THREAD ,MULTI_THREAD }
3331
3432 public enum PreSort {NONE ,DATE }
@@ -37,14 +35,16 @@ public enum PreSort {NONE,DATE}
3735
3836 private static final AtomicInteger linesProcessed = new AtomicInteger (0 );
3937
38+ private static final int ROW_PREFETCH = 25000 ;
39+
4040 /**
4141 * Parses the date and time fields from a CSV record and returns a LocalDateTime object representing the combined datetime value.
4242 * If the time is not defined, assumes midnight.
4343 *
4444 * @param record The CSVRecord representing a single row of data in the CSV file.
4545 * @return A LocalDateTime object representing the combined date and time parsed from the CSV record.
4646 */
47- static final LocalDateTime parseSubDate (CSVRecord record )
47+ private static LocalDateTime parseSubDate (CSVRecord record )
4848 {
4949 if (record .getRecordNumber () == 1l )
5050 return LocalDateTime .MIN ;
@@ -64,7 +64,7 @@ static final LocalDateTime parseSubDate(CSVRecord record)
6464 * @param record The CSVRecord representing a single row of data in the CSV file.
6565 * @return An EbirdCsvRow object constructed from the CSV record.
6666 */
67- private static final EbirdCsvRow parseCsvLine (CSVRecord record )
67+ private static EbirdCsvRow parseCsvLine (CSVRecord record )
6868 {
6969 if (record .getRecordNumber () == 1l )
7070 return null ; // skip the header
@@ -136,7 +136,7 @@ private static final EbirdCsvRow parseCsvLine(CSVRecord record)
136136 */
137137 public static final void parseCsv (Path csvFile ,Consumer <EbirdCsvRow > rowProcessor ,ParseMode mode ,PreSort preSort ) throws IOException
138138 {
139- logger .info ("Parsing " + csvFile + " ..." );
139+ log .info ("Parsing {} ..." , csvFile );
140140
141141 linesProcessed .set (0 );
142142
@@ -146,19 +146,19 @@ public static final void parseCsv(Path csvFile,Consumer<EbirdCsvRow> rowProcesso
146146
147147 StopWatch stopwatch = StopWatch .createStarted ();
148148
149- Iterable <CSVRecord > records ;
149+ Flux <CSVRecord > recordsFlux ;
150150 if (PreSort .DATE == preSort )
151151 {
152152 // Read all lines and sort by date and time columns
153153 List <CSVRecord > recordsList = csvParser .getRecords ();
154154 recordsList .sort (Comparator .comparing (EbirdCsvParser ::parseSubDate ));
155- logger .debug ("Read and sorted " + ( recordsList .size ()-1 ) + " eBird observations in " + stopwatch .getTime (TimeUnit .SECONDS ) + " seconds" );
156- records = recordsList ;
155+ log .debug ("Read and sorted {} eBird observations in {} seconds" , recordsList .size ()-1 , stopwatch .getTime (TimeUnit .SECONDS ));
156+ recordsFlux = Flux . fromIterable ( recordsList ) ;
157157 }
158158 else
159- records = csvParser ;
159+ recordsFlux = Flux . fromIterable ( csvParser ) ;
160160
161- Consumer <CSVRecord > csvRecordConsumer = new Consumer <CSVRecord >() {
161+ final Consumer <CSVRecord > csvRecordConsumer = new Consumer <CSVRecord >() {
162162 @ Override
163163 public void accept (CSVRecord record )
164164 {
@@ -174,15 +174,16 @@ public void accept(CSVRecord record)
174174 switch (mode )
175175 {
176176 case MULTI_THREAD :
177- Flux . fromIterable ( records ). parallel ().runOn (Schedulers .parallel ()).sequential (25000 ).doOnNext (csvRecordConsumer ).then ().block ();
177+ recordsFlux . parallel ().runOn (Schedulers .parallel ()).sequential (ROW_PREFETCH ).doOnNext (csvRecordConsumer ).then ().block ();
178178 break ;
179179 case SINGLE_THREAD :
180- Flux . fromIterable ( records ) .doOnNext (csvRecordConsumer ).then ().block ();
180+ recordsFlux .doOnNext (csvRecordConsumer ).then ().block ();
181181 break ;
182182 }
183183
184184 stopwatch .stop ();
185- logger .info ("Processed " + linesProcessed .get () + " eBird observations in " + stopwatch .getTime (TimeUnit .SECONDS ) + " seconds" );
185+
186+ log .info ("Processed {} eBird observations in {} seconds" ,linesProcessed .get (),stopwatch .getTime (TimeUnit .SECONDS ));
186187 }
187188 }
188189
0 commit comments