1616
1717package org .opencb .commons ;
1818
19+ import org .apache .commons .lang3 .tuple .Pair ;
1920import org .opencb .commons .run .Task ;
2021import org .slf4j .Logger ;
2122import org .slf4j .LoggerFactory ;
2223
2324import java .text .DecimalFormat ;
25+ import java .util .LinkedList ;
2426import java .util .List ;
2527import java .util .concurrent .*;
2628import java .util .concurrent .atomic .AtomicLong ;
@@ -43,33 +45,43 @@ public class ProgressLogger {
4345
4446 private final String message ;
4547 private final int numLinesLog ;
48+ private final long logFrequencyMillis ;
49+ private boolean progressRateEnabled = true ;
50+ private long progressRateWindowSizeSeconds ;
51+ private boolean progressRateMillionHours = false ; // If true, progress rate is in millions of elements per hour
4652 private long totalCount ;
4753 private boolean isApproximated ; // Total count is an approximated value
4854 private final AtomicReference <Future <Long >> futureTotalCount = new AtomicReference <>();
4955 private final AtomicLong count ;
56+ private final long startTime ;
57+ private final LinkedList <Pair <Long , Long >> times = new LinkedList <>();
5058
5159 private double batchSize ;
5260
5361 private Logger logger = LoggerFactory .getLogger (ProgressLogger .class );
5462
5563 public ProgressLogger (String message ) {
56- this (message , 0 , null , 200 );
64+ this (message , 0 , null , 200 , 0 );
65+ }
66+
67+ public ProgressLogger (String message , long logFrequency , TimeUnit timeUnit ) {
68+ this (message , 0 , null , 0 , timeUnit .toMillis (logFrequency ));
5769 }
5870
5971 public ProgressLogger (String message , long totalCount ) {
60- this (message , totalCount , null , 200 );
72+ this (message , totalCount , null , 200 , 0 );
6173 }
6274
6375 public ProgressLogger (String message , long totalCount , int numLinesLog ) {
64- this (message , totalCount , null , numLinesLog );
76+ this (message , totalCount , null , numLinesLog , 0 );
6577 }
6678
6779 public ProgressLogger (String message , Future <Long > futureTotalCount ) {
68- this (message , 0 , futureTotalCount , 200 );
80+ this (message , 0 , futureTotalCount , 200 , 0 );
6981 }
7082
7183 public ProgressLogger (String message , Future <Long > futureTotalCount , int numLinesLog ) {
72- this (message , 0 , futureTotalCount , numLinesLog );
84+ this (message , 0 , futureTotalCount , numLinesLog , 0 );
7385 }
7486
7587 /**
@@ -79,10 +91,10 @@ public ProgressLogger(String message, Future<Long> futureTotalCount, int numLine
7991 * @param numLinesLog Number of lines to print
8092 */
8193 public ProgressLogger (String message , Callable <Long > totalCountCallable , int numLinesLog ) {
82- this (message , 0 , getFuture (totalCountCallable ), numLinesLog );
94+ this (message , 0 , getFuture (totalCountCallable ), numLinesLog , 0 );
8395 }
8496
85- private ProgressLogger (String message , long totalCount , Future <Long > futureTotalCount , int numLinesLog ) {
97+ private ProgressLogger (String message , long totalCount , Future <Long > futureTotalCount , int numLinesLog , long logFrequencyMillis ) {
8698 if (message .endsWith (" " )) {
8799 this .message = message ;
88100 } else {
@@ -92,12 +104,21 @@ private ProgressLogger(String message, long totalCount, Future<Long> futureTotal
92104 this .totalCount = totalCount ;
93105 this .futureTotalCount .set (futureTotalCount );
94106 this .count = new AtomicLong ();
95- if (totalCount == 0 ) {
96- batchSize = DEFAULT_BATCH_SIZE ;
107+ if (logFrequencyMillis > 0 ) {
108+ this .logFrequencyMillis = logFrequencyMillis ;
109+ batchSize = 0 ;
97110 } else {
98- updateBatchSize ();
111+ // Avoid not logging for too long. Log at least once a minute by default
112+ this .logFrequencyMillis = TimeUnit .MINUTES .toMillis (1 );
113+ if (totalCount == 0 ) {
114+ batchSize = DEFAULT_BATCH_SIZE ;
115+ } else {
116+ updateBatchSize ();
117+ }
99118 }
100119 isApproximated = false ;
120+ startTime = System .currentTimeMillis ();
121+ progressRateWindowSizeSeconds = 60 ;
101122 }
102123
103124
@@ -118,6 +139,25 @@ public ProgressLogger setApproximateTotalCount(long aproximateTotalCount) {
118139 return this ;
119140 }
120141
142+ public ProgressLogger setProgressRateWindowSize (int progressRateWindowSize , TimeUnit timeUnit ) {
143+ this .progressRateWindowSizeSeconds = timeUnit .toSeconds (progressRateWindowSize );
144+ return this ;
145+ }
146+
147+ public ProgressLogger setProgressRateAtMillionsPerHours () {
148+ return setProgressRateAtMillionsPerHours (true );
149+ }
150+
151+ public ProgressLogger setProgressRateAtMillionsPerHours (boolean progressRateMillionHours ) {
152+ this .progressRateMillionHours = progressRateMillionHours ;
153+ return this ;
154+ }
155+
156+ public ProgressLogger disableProgressRate () {
157+ this .progressRateEnabled = false ;
158+ return this ;
159+ }
160+
121161 public void increment (long delta ) {
122162 increment (delta , "" , null );
123163 }
@@ -135,13 +175,37 @@ private void increment(long delta, String message, Supplier<String> supplier) {
135175 long count = previousCount + delta ;
136176
137177 updateFutureTotalCount ();
138- if ((int ) (previousCount / batchSize ) != (int ) (count / batchSize ) || count == totalCount && delta > 0 ) {
139- log (count , supplier == null ? message : supplier .get ());
178+ long currentTimeMillis = System .currentTimeMillis ();
179+ if (shouldLog (delta , previousCount , count , currentTimeMillis )) {
180+ log (count , supplier == null ? message : supplier .get (), currentTimeMillis );
181+ }
182+ }
183+
184+ private boolean shouldLog (long delta , long previousCount , long count , long currentTimeMillis ) {
185+ if (batchSize > 0 ) {
186+ if ((int ) (previousCount / batchSize ) != (int ) (count / batchSize )) {
187+ return true ;
188+ }
189+ }
190+ if (logFrequencyMillis > 0 ) {
191+ long lastLogTime = times .isEmpty () ? startTime : times .getLast ().getRight ();
192+ if (currentTimeMillis - lastLogTime > logFrequencyMillis ) {
193+ return true ;
194+ }
195+ }
196+ if (count == totalCount && delta > 0 ) {
197+ return true ;
140198 }
199+ return false ;
141200 }
142201
143- protected synchronized void log (long count , String extraMessage ) {
144- long totalCount = getTotalCount ();
202+ protected synchronized void log (long count , String extraMessage , long currentTimeMillis ) {
203+ times .add (Pair .of (count , currentTimeMillis ));
204+ if (times .size () > 5 && times .get (0 ).getRight () < currentTimeMillis - progressRateWindowSizeSeconds * 1000 ) {
205+ // Remove old points that are outside the progress rate window
206+ times .removeFirst ();
207+ }
208+ long totalCount = this .totalCount ;
145209
146210 StringBuilder sb = new StringBuilder (message ).append (count );
147211 if (totalCount > 0 ) {
@@ -152,6 +216,45 @@ protected synchronized void log(long count, String extraMessage) {
152216 }
153217 sb .append (totalCount ).append (' ' ).append (DECIMAL_FORMAT .format (((float ) (count )) / totalCount ));
154218 }
219+ if (progressRateEnabled ) {
220+ float elapsedTime = (float ) (currentTimeMillis - startTime ) / 1000 ;
221+ float progressRate = count / elapsedTime ; // elements per second
222+ boolean addRelativeTime = times .size () > 5 && elapsedTime > progressRateWindowSizeSeconds ;
223+ float relativeTime ;
224+ float relativeProgressRate ; // elements per second
225+ if (addRelativeTime ) {
226+ int idx = 5 ;
227+ do {
228+ Pair <Long , Long > relativePoint = times .get (times .size () - idx );
229+ relativeTime = (float ) (currentTimeMillis - relativePoint .getRight ()) / 1000 ;
230+ relativeProgressRate = (count - relativePoint .getLeft ()) / relativeTime ;
231+ } while (relativeTime < progressRateWindowSizeSeconds && idx ++ < times .size ());
232+
233+ } else {
234+ relativeTime = 0 ;
235+ relativeProgressRate = 0 ;
236+ }
237+ String progressRateUnits ;
238+ String rateFormat ;
239+ if (progressRateMillionHours ) {
240+ progressRateUnits = "M/h" ;
241+ rateFormat = "%.2f" ;
242+ progressRate = (progressRate / 1_000_000 ) * 3600 ; // Convert to millions per hour
243+ relativeProgressRate = (relativeProgressRate / 1_000_000 ) * 3600 ; // Convert to millions per hour
244+ } else {
245+ progressRateUnits = "elements/s" ;
246+ rateFormat = "%.0f" ;
247+ }
248+ sb .append (" in " )
249+ .append (String .format ("%.2f" , elapsedTime )).append ("s (" )
250+ .append (String .format (rateFormat , progressRate )).append (" " + progressRateUnits + ")" );
251+ if (addRelativeTime ) {
252+ sb .append (", (" )
253+ .append (String .format (rateFormat , relativeProgressRate )).append (" " + progressRateUnits + " in last " )
254+ .append (String .format ("%.2f" , relativeTime )).append ("s" )
255+ .append (')' );
256+ }
257+ }
155258 if (!extraMessage .isEmpty () && (!extraMessage .startsWith (" " ) && !extraMessage .startsWith ("," ) && !extraMessage .startsWith ("." ))) {
156259 sb .append (' ' );
157260 }
@@ -181,10 +284,6 @@ private void updateFutureTotalCount() {
181284 }
182285 }
183286
184- private long getTotalCount () {
185- return this .totalCount ;
186- }
187-
188287 private void updateBatchSize () {
189288 batchSize = Math .max ((double ) totalCount / numLinesLog , MIN_BATCH_SIZE );
190289 }
@@ -196,6 +295,10 @@ private static Future<Long> getFuture(Callable<Long> totalCountCallable) {
196295 return future ;
197296 }
198297
298+ public long getCount () {
299+ return count .get ();
300+ }
301+
199302 public <T > Task <T , T > asTask () {
200303 return asTask (null );
201304 }
0 commit comments