Skip to content

Commit ad76281

Browse files
committed
Merge branch 'develop' into TASK-7610
2 parents fab7c14 + 506086f commit ad76281

File tree

6 files changed

+227
-37
lines changed

6 files changed

+227
-37
lines changed

commons-datastore/commons-datastore-solr/src/main/java/org/opencb/commons/datastore/solr/SolrFacetToFacetFieldsConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,14 @@ private static void parseBuckets(SimpleOrderedMap<Object> solrFacets, FacetField
103103

104104
List<FacetField.Bucket> buckets = new ArrayList<>();
105105
for (SimpleOrderedMap<Object> solrBucket: solrBuckets) {
106-
int count = 0;
106+
long count = 0;
107107
String value = "";
108108
FacetField subfield;
109109
List<FacetField> subfields = new ArrayList<>();
110110
for (int i = 0; i < solrBucket.size(); i++) {
111111
String fullname = solrBucket.getName(i);
112112
if ("count".equals(fullname)) {
113-
count = (int) solrBucket.getVal(i);
113+
count = ((Number) solrBucket.getVal(i)).longValue();
114114
} else if ("val".equals(fullname)) {
115115
value = solrBucket.getVal(i).toString();
116116
} else {

commons-lib/src/main/java/org/opencb/commons/ProgressLogger.java

Lines changed: 121 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616

1717
package org.opencb.commons;
1818

19+
import org.apache.commons.lang3.tuple.Pair;
1920
import org.opencb.commons.run.Task;
2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
2223

2324
import java.text.DecimalFormat;
25+
import java.util.LinkedList;
2426
import java.util.List;
2527
import java.util.concurrent.*;
2628
import java.util.concurrent.atomic.AtomicLong;
@@ -43,33 +45,43 @@ public class ProgressLogger {
4345

4446
private final String message;
4547
private final int numLinesLog;
48+
private final long logFrequencyMillis;
49+
private boolean progressRateEnabled = true;
50+
private long progressRateWindowSizeSeconds;
51+
private boolean progressRateMillionHours = false; // If true, progress rate is in millions of elements per hour
4652
private long totalCount;
4753
private boolean isApproximated; // Total count is an approximated value
4854
private final AtomicReference<Future<Long>> futureTotalCount = new AtomicReference<>();
4955
private final AtomicLong count;
56+
private final long startTime;
57+
private final LinkedList<Pair<Long, Long>> times = new LinkedList<>();
5058

5159
private double batchSize;
5260

5361
private Logger logger = LoggerFactory.getLogger(ProgressLogger.class);
5462

5563
public ProgressLogger(String message) {
56-
this(message, 0, null, 200);
64+
this(message, 0, null, 200, 0);
65+
}
66+
67+
public ProgressLogger(String message, long logFrequency, TimeUnit timeUnit) {
68+
this(message, 0, null, 0, timeUnit.toMillis(logFrequency));
5769
}
5870

5971
public ProgressLogger(String message, long totalCount) {
60-
this(message, totalCount, null, 200);
72+
this(message, totalCount, null, 200, 0);
6173
}
6274

6375
public ProgressLogger(String message, long totalCount, int numLinesLog) {
64-
this(message, totalCount, null, numLinesLog);
76+
this(message, totalCount, null, numLinesLog, 0);
6577
}
6678

6779
public ProgressLogger(String message, Future<Long> futureTotalCount) {
68-
this(message, 0, futureTotalCount, 200);
80+
this(message, 0, futureTotalCount, 200, 0);
6981
}
7082

7183
public ProgressLogger(String message, Future<Long> futureTotalCount, int numLinesLog) {
72-
this(message, 0, futureTotalCount, numLinesLog);
84+
this(message, 0, futureTotalCount, numLinesLog, 0);
7385
}
7486

7587
/**
@@ -79,10 +91,10 @@ public ProgressLogger(String message, Future<Long> futureTotalCount, int numLine
7991
* @param numLinesLog Number of lines to print
8092
*/
8193
public ProgressLogger(String message, Callable<Long> totalCountCallable, int numLinesLog) {
82-
this(message, 0, getFuture(totalCountCallable), numLinesLog);
94+
this(message, 0, getFuture(totalCountCallable), numLinesLog, 0);
8395
}
8496

85-
private ProgressLogger(String message, long totalCount, Future<Long> futureTotalCount, int numLinesLog) {
97+
private ProgressLogger(String message, long totalCount, Future<Long> futureTotalCount, int numLinesLog, long logFrequencyMillis) {
8698
if (message.endsWith(" ")) {
8799
this.message = message;
88100
} else {
@@ -92,12 +104,21 @@ private ProgressLogger(String message, long totalCount, Future<Long> futureTotal
92104
this.totalCount = totalCount;
93105
this.futureTotalCount.set(futureTotalCount);
94106
this.count = new AtomicLong();
95-
if (totalCount == 0) {
96-
batchSize = DEFAULT_BATCH_SIZE;
107+
if (logFrequencyMillis > 0) {
108+
this.logFrequencyMillis = logFrequencyMillis;
109+
batchSize = 0;
97110
} else {
98-
updateBatchSize();
111+
// Avoid not logging for too long. Log at least once a minute by default
112+
this.logFrequencyMillis = TimeUnit.MINUTES.toMillis(1);
113+
if (totalCount == 0) {
114+
batchSize = DEFAULT_BATCH_SIZE;
115+
} else {
116+
updateBatchSize();
117+
}
99118
}
100119
isApproximated = false;
120+
startTime = System.currentTimeMillis();
121+
progressRateWindowSizeSeconds = 60;
101122
}
102123

103124

@@ -118,6 +139,25 @@ public ProgressLogger setApproximateTotalCount(long aproximateTotalCount) {
118139
return this;
119140
}
120141

142+
public ProgressLogger setProgressRateWindowSize(int progressRateWindowSize, TimeUnit timeUnit) {
143+
this.progressRateWindowSizeSeconds = timeUnit.toSeconds(progressRateWindowSize);
144+
return this;
145+
}
146+
147+
public ProgressLogger setProgressRateAtMillionsPerHours() {
148+
return setProgressRateAtMillionsPerHours(true);
149+
}
150+
151+
public ProgressLogger setProgressRateAtMillionsPerHours(boolean progressRateMillionHours) {
152+
this.progressRateMillionHours = progressRateMillionHours;
153+
return this;
154+
}
155+
156+
public ProgressLogger disableProgressRate() {
157+
this.progressRateEnabled = false;
158+
return this;
159+
}
160+
121161
public void increment(long delta) {
122162
increment(delta, "", null);
123163
}
@@ -135,13 +175,37 @@ private void increment(long delta, String message, Supplier<String> supplier) {
135175
long count = previousCount + delta;
136176

137177
updateFutureTotalCount();
138-
if ((int) (previousCount / batchSize) != (int) (count / batchSize) || count == totalCount && delta > 0) {
139-
log(count, supplier == null ? message : supplier.get());
178+
long currentTimeMillis = System.currentTimeMillis();
179+
if (shouldLog(delta, previousCount, count, currentTimeMillis)) {
180+
log(count, supplier == null ? message : supplier.get(), currentTimeMillis);
181+
}
182+
}
183+
184+
private boolean shouldLog(long delta, long previousCount, long count, long currentTimeMillis) {
185+
if (batchSize > 0) {
186+
if ((int) (previousCount / batchSize) != (int) (count / batchSize)) {
187+
return true;
188+
}
189+
}
190+
if (logFrequencyMillis > 0) {
191+
long lastLogTime = times.isEmpty() ? startTime : times.getLast().getRight();
192+
if (currentTimeMillis - lastLogTime > logFrequencyMillis) {
193+
return true;
194+
}
195+
}
196+
if (count == totalCount && delta > 0) {
197+
return true;
140198
}
199+
return false;
141200
}
142201

143-
protected synchronized void log(long count, String extraMessage) {
144-
long totalCount = getTotalCount();
202+
protected synchronized void log(long count, String extraMessage, long currentTimeMillis) {
203+
times.add(Pair.of(count, currentTimeMillis));
204+
if (times.size() > 5 && times.get(0).getRight() < currentTimeMillis - progressRateWindowSizeSeconds * 1000) {
205+
// Remove old points that are outside the progress rate window
206+
times.removeFirst();
207+
}
208+
long totalCount = this.totalCount;
145209

146210
StringBuilder sb = new StringBuilder(message).append(count);
147211
if (totalCount > 0) {
@@ -152,6 +216,45 @@ protected synchronized void log(long count, String extraMessage) {
152216
}
153217
sb.append(totalCount).append(' ').append(DECIMAL_FORMAT.format(((float) (count)) / totalCount));
154218
}
219+
if (progressRateEnabled) {
220+
float elapsedTime = (float) (currentTimeMillis - startTime) / 1000;
221+
float progressRate = count / elapsedTime; // elements per second
222+
boolean addRelativeTime = times.size() > 5 && elapsedTime > progressRateWindowSizeSeconds;
223+
float relativeTime;
224+
float relativeProgressRate; // elements per second
225+
if (addRelativeTime) {
226+
int idx = 5;
227+
do {
228+
Pair<Long, Long> relativePoint = times.get(times.size() - idx);
229+
relativeTime = (float) (currentTimeMillis - relativePoint.getRight()) / 1000;
230+
relativeProgressRate = (count - relativePoint.getLeft()) / relativeTime;
231+
} while (relativeTime < progressRateWindowSizeSeconds && idx++ < times.size());
232+
233+
} else {
234+
relativeTime = 0;
235+
relativeProgressRate = 0;
236+
}
237+
String progressRateUnits;
238+
String rateFormat;
239+
if (progressRateMillionHours) {
240+
progressRateUnits = "M/h";
241+
rateFormat = "%.2f";
242+
progressRate = (progressRate / 1_000_000) * 3600; // Convert to millions per hour
243+
relativeProgressRate = (relativeProgressRate / 1_000_000) * 3600; // Convert to millions per hour
244+
} else {
245+
progressRateUnits = "elements/s";
246+
rateFormat = "%.0f";
247+
}
248+
sb.append(" in ")
249+
.append(String.format("%.2f", elapsedTime)).append("s (")
250+
.append(String.format(rateFormat, progressRate)).append(" " + progressRateUnits + ")");
251+
if (addRelativeTime) {
252+
sb.append(", (")
253+
.append(String.format(rateFormat, relativeProgressRate)).append(" " + progressRateUnits + " in last ")
254+
.append(String.format("%.2f", relativeTime)).append("s")
255+
.append(')');
256+
}
257+
}
155258
if (!extraMessage.isEmpty() && (!extraMessage.startsWith(" ") && !extraMessage.startsWith(",") && !extraMessage.startsWith("."))) {
156259
sb.append(' ');
157260
}
@@ -181,10 +284,6 @@ private void updateFutureTotalCount() {
181284
}
182285
}
183286

184-
private long getTotalCount() {
185-
return this.totalCount;
186-
}
187-
188287
private void updateBatchSize() {
189288
batchSize = Math.max((double) totalCount / numLinesLog, MIN_BATCH_SIZE);
190289
}
@@ -196,6 +295,10 @@ private static Future<Long> getFuture(Callable<Long> totalCountCallable) {
196295
return future;
197296
}
198297

298+
public long getCount() {
299+
return count.get();
300+
}
301+
199302
public <T> Task<T, T> asTask() {
200303
return asTask(null);
201304
}

commons-lib/src/main/java/org/opencb/commons/io/DataReader.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@
1818

1919
import org.opencb.commons.run.Task;
2020

21-
import java.util.Collections;
22-
import java.util.Iterator;
23-
import java.util.List;
24-
import java.util.NoSuchElementException;
21+
import java.util.*;
22+
import java.util.function.Consumer;
2523
import java.util.stream.Stream;
2624
import java.util.stream.StreamSupport;
2725

@@ -163,4 +161,16 @@ public T next() {
163161
};
164162
}
165163

164+
@Override
165+
default void forEach(Consumer<? super T> action) {
166+
forEach(action, 1);
167+
}
168+
169+
default void forEach(Consumer<? super T> action, int batchSize) {
170+
Objects.requireNonNull(action);
171+
for (Iterator<T> iterator = this.iterator(batchSize); iterator.hasNext();) {
172+
T t = iterator.next();
173+
action.accept(t);
174+
}
175+
}
166176
}

commons-lib/src/main/java/org/opencb/commons/io/DataWriter.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,61 @@ public void post() throws Exception {
9898
};
9999
}
100100

101+
default DataWriter<T> then(DataWriter<T> nextWriter) {
102+
return then(nextWriter.asTask());
103+
}
104+
105+
default DataWriter<T> then(Task<T, ?> nextTask) {
106+
return new DataWriter<T>() {
107+
@Override
108+
public boolean open() {
109+
return DataWriter.this.open();
110+
}
111+
112+
@Override
113+
public boolean pre() {
114+
boolean res = DataWriter.this.pre();
115+
try {
116+
nextTask.pre();
117+
} catch (RuntimeException e) {
118+
throw e;
119+
} catch (Exception e) {
120+
throw new RuntimeException(e);
121+
}
122+
return res;
123+
}
124+
125+
@Override
126+
public boolean close() {
127+
return DataWriter.this.close();
128+
}
129+
130+
@Override
131+
public boolean post() {
132+
boolean res = DataWriter.this.post();
133+
try {
134+
nextTask.post();
135+
} catch (RuntimeException e) {
136+
throw e;
137+
} catch (Exception e) {
138+
throw new RuntimeException(e);
139+
}
140+
return res;
141+
}
142+
143+
@Override
144+
public boolean write(List<T> batch) {
145+
boolean res = DataWriter.this.write(batch);
146+
try {
147+
nextTask.apply(batch);
148+
return res;
149+
} catch (RuntimeException e) {
150+
throw e;
151+
} catch (Exception e) {
152+
throw new RuntimeException(e);
153+
}
154+
}
155+
};
156+
}
157+
101158
}

0 commit comments

Comments
 (0)