Skip to content

Commit ae7d88c

Browse files
committed
HDDS-14053. Make MinHeapIterator in SstFileSetReader more generic to work with any Autoclosable iterator
Change-Id: I18461f4f48849ec8442ba89531f1ba8951aa6685
1 parent 2930ecc commit ae7d88c

File tree

4 files changed

+271
-116
lines changed

4 files changed

+271
-116
lines changed

hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ClosableIterator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717

1818
package org.apache.hadoop.ozone.util;
1919

20+
import java.io.Closeable;
2021
import java.util.Iterator;
2122

2223
/**
2324
* An {@link Iterator} that may hold resources until it is closed.
2425
*/
25-
public interface ClosableIterator<E> extends Iterator<E>, AutoCloseable {
26+
public interface ClosableIterator<E> extends Iterator<E>, Closeable {
2627
@Override
2728
void close();
2829
}

hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,24 @@
1717

1818
package org.apache.hadoop.hdds.utils.db;
1919

20+
import static org.apache.hadoop.hdds.utils.db.Table.newKeyValue;
21+
2022
import java.io.File;
23+
import java.io.IOException;
24+
import java.util.Collection;
25+
import java.util.Comparator;
2126
import java.util.List;
2227
import java.util.Map;
28+
import java.util.NoSuchElementException;
29+
import java.util.stream.Collectors;
30+
import java.util.stream.IntStream;
2331
import org.apache.hadoop.hdds.annotation.InterfaceStability;
32+
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
2433
import org.apache.hadoop.hdds.utils.db.cache.TableCache;
2534
import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType;
2635
import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions;
36+
import org.apache.hadoop.ozone.util.ClosableIterator;
37+
import org.apache.ozone.rocksdb.util.MinHeapMergeIterator;
2738
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
2839
import org.apache.ratis.util.UncheckedAutoCloseable;
2940

@@ -170,4 +181,39 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
170181
boolean isClosed();
171182

172183
String getSnapshotsParentDir();
184+
185+
/**
186+
* Creates an iterator that merges multiple tables into a single iterator,
187+
* grouping values with the same key across the tables.
188+
*
189+
* @param <KEY> the type of keys for the tables
190+
* @param keyComparator the comparator used to compare keys from different tables
191+
* @param prefix the prefix used to filter entries of each table
192+
* @param table one or more tables to merge
193+
* @return a closable iterator over merged key-value pairs, where each key corresponds
194+
* to a collection of values from the tables
195+
*/
196+
default <KEY> ClosableIterator<KeyValue<KEY, Collection<Object>>> getMergeIterator(
197+
Comparator<KEY> keyComparator, KEY prefix, Table<KEY, Object>... table) {
198+
List<Object> tableValues = IntStream.range(0, table.length).mapToObj(i -> null).collect(Collectors.toList());
199+
KeyValue<KEY, Object> defaultNullValue = newKeyValue(null, null);
200+
Comparator<KeyValue<KEY, Object>> comparator = Comparator.comparing(KeyValue::getKey, keyComparator);
201+
return new MinHeapMergeIterator<KeyValue<KEY, Object>, Table.KeyValueIterator<KEY, Object>,
202+
KeyValue<KEY, Collection<Object>>>(table.length + 1, comparator) {
203+
@Override
204+
protected Table.KeyValueIterator<KEY, Object> getIterator(int idx) throws IOException {
205+
return table[idx].iterator(prefix);
206+
}
207+
208+
@Override
209+
protected KeyValue<KEY, Collection<Object>> merge(Map<Integer, KeyValue<KEY, Object>> keysToMerge) {
210+
KEY key = keysToMerge.values().stream().findAny()
211+
.orElseThrow(() -> new NoSuchElementException("No keys found")).getKey();
212+
for (int i = 0; i < tableValues.size(); i++) {
213+
tableValues.set(i, keysToMerge.getOrDefault(i, defaultNullValue).getValue());
214+
}
215+
return newKeyValue(key, tableValues);
216+
}
217+
};
218+
}
173219
}
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ozone.rocksdb.util;
19+
20+
import jakarta.annotation.Nonnull;
21+
import java.io.Closeable;
22+
import java.io.IOException;
23+
import java.io.UncheckedIOException;
24+
import java.util.Comparator;
25+
import java.util.HashMap;
26+
import java.util.Iterator;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.NoSuchElementException;
30+
import java.util.Objects;
31+
import java.util.PriorityQueue;
32+
import java.util.stream.Collectors;
33+
import java.util.stream.IntStream;
34+
import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
35+
import org.apache.hadoop.ozone.util.ClosableIterator;
36+
import org.rocksdb.RocksDBException;
37+
38+
/**
39+
* An abstract class that provides functionality to merge elements from multiple sorted iterators
40+
* using a min-heap. The {@code MinHeapMergeIterator} ensures the merged output is in sorted order
41+
* by repeatedly polling the smallest element from the heap of iterators.
42+
*
43+
* @param <K> the type of keys being merged, must be {@link Comparable}
44+
* @param <I> the type of iterators being used, must extend {@link Iterator} and implement {@link Closeable}
45+
* @param <V> the type of the final merged output
46+
*/
47+
public abstract class MinHeapMergeIterator<K, I extends Iterator<K> & Closeable, V>
48+
implements ClosableIterator<V> {
49+
private final PriorityQueue<HeapEntry<K>> minHeap;
50+
private final Map<Integer, K> keys;
51+
private final List<I> iterators;
52+
private boolean initialized;
53+
private final Comparator<K> comparator;
54+
55+
public MinHeapMergeIterator(int numberOfIterators, Comparator<K> comparator) {
56+
this.minHeap = new PriorityQueue<>(Math.max(numberOfIterators, 1));
57+
keys = new HashMap<>(numberOfIterators);
58+
iterators = IntStream.range(0, numberOfIterators).mapToObj(i -> (I) null).collect(Collectors.toList());
59+
this.initialized = false;
60+
this.comparator = comparator;
61+
}
62+
63+
protected abstract I getIterator(int idx) throws RocksDBException, IOException;
64+
65+
private boolean initHeap() throws IOException, RocksDBException {
66+
if (initialized) {
67+
return false;
68+
}
69+
initialized = true;
70+
int count = 0;
71+
try {
72+
for (int idx = 0; idx < iterators.size(); idx++) {
73+
I itr = getIterator(idx);
74+
iterators.set(idx, itr);
75+
HeapEntry<K> entry = new HeapEntry<>(idx, itr, comparator);
76+
if (entry.getCurrentKey() != null) {
77+
minHeap.add(entry);
78+
count++;
79+
} else {
80+
// No valid entries, close the iterator.
81+
closeItrAtIndex(idx);
82+
}
83+
}
84+
} catch (IOException | RocksDBException e) {
85+
close();
86+
throw e;
87+
}
88+
return count > 0;
89+
}
90+
91+
@Override
92+
public boolean hasNext() {
93+
try {
94+
return !minHeap.isEmpty() || initHeap();
95+
} catch (IOException e) {
96+
throw new UncheckedIOException(e);
97+
} catch (RocksDBException e) {
98+
throw new UncheckedIOException(new RocksDatabaseException("Error while initializing iterator ", e));
99+
}
100+
}
101+
102+
protected abstract V merge(Map<Integer, K> keysToMerge);
103+
104+
@Override
105+
public V next() {
106+
if (!hasNext()) {
107+
throw new NoSuchElementException("No more elements found.");
108+
}
109+
110+
assert minHeap.peek() != null;
111+
// Get current key from heap
112+
K currentKey = minHeap.peek().getCurrentKey();
113+
// Clear the keys list by setting all entries to null.
114+
keys.clear();
115+
// Advance all entries with the same key (from different files)
116+
while (!minHeap.isEmpty() && Objects.equals(minHeap.peek().getCurrentKey(), currentKey)) {
117+
HeapEntry<K> entry = minHeap.poll();
118+
int idx = entry.index;
119+
// Set the key for the current entry in the keys list.
120+
keys.put(idx, entry.getCurrentKey());
121+
if (entry.advance()) {
122+
minHeap.offer(entry);
123+
} else {
124+
// Iterator is exhausted, close it to prevent resource leak
125+
try {
126+
closeItrAtIndex(idx);
127+
} catch (IOException e) {
128+
throw new UncheckedIOException(e);
129+
}
130+
}
131+
}
132+
return merge(keys);
133+
}
134+
135+
private void closeItrAtIndex(int idx) throws IOException {
136+
if (iterators.get(idx) != null) {
137+
iterators.get(idx).close();
138+
iterators.set(idx, null);
139+
}
140+
}
141+
142+
public void close() {
143+
IOException exception = null;
144+
for (int idx = 0; idx < iterators.size(); idx++) {
145+
try {
146+
closeItrAtIndex(idx);
147+
} catch (IOException e) {
148+
exception = e;
149+
}
150+
}
151+
if (exception != null) {
152+
throw new UncheckedIOException(exception);
153+
}
154+
}
155+
156+
/**
157+
* A wrapper class that holds an iterator and its current value for heap operations.
158+
*/
159+
private static final class HeapEntry<T> implements Comparable<HeapEntry<T>> {
160+
private final int index;
161+
private final Iterator<T> iterator;
162+
private T currentKey;
163+
private Comparator<T> comparator;
164+
165+
private HeapEntry(int index, Iterator<T> iterator, Comparator<T> comparator) {
166+
this.iterator = iterator;
167+
this.index = index;
168+
this.comparator = comparator;
169+
advance();
170+
}
171+
172+
private boolean advance() {
173+
if (iterator.hasNext()) {
174+
currentKey = iterator.next();
175+
return true;
176+
} else {
177+
currentKey = null;
178+
return false;
179+
}
180+
}
181+
182+
private T getCurrentKey() {
183+
return currentKey;
184+
}
185+
186+
@Override
187+
public int compareTo(@Nonnull HeapEntry<T> other) {
188+
return Comparator.comparing(HeapEntry<T>::getCurrentKey, this.comparator).compare(this, other);
189+
}
190+
191+
@Override
192+
@SuppressWarnings("unchecked")
193+
public boolean equals(Object obj) {
194+
if (this == obj) {
195+
return true;
196+
}
197+
if (obj == null || getClass() != obj.getClass()) {
198+
return false;
199+
}
200+
201+
HeapEntry<T> other = (HeapEntry<T>) obj;
202+
return this.compareTo(other) == 0;
203+
}
204+
205+
@Override
206+
public int hashCode() {
207+
return currentKey.hashCode();
208+
}
209+
}
210+
}

0 commit comments

Comments
 (0)