Skip to content

Commit 044f746

Browse files
rnewsonnickva
authored andcommitted
enhance _nouveau_cleanup
Allow a glob pattern so we can delete indexes for previous versions of a database.
1 parent 866348c commit 044f746

File tree

4 files changed

+112
-35
lines changed

4 files changed

+112
-35
lines changed

nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.concurrent.TimeUnit;
3434
import java.util.concurrent.locks.Lock;
3535
import java.util.concurrent.locks.ReentrantReadWriteLock;
36-
import java.util.stream.Stream;
3736
import org.apache.couchdb.nouveau.api.IndexDefinition;
3837
import org.apache.couchdb.nouveau.lucene9.Lucene9AnalyzerFactory;
3938
import org.apache.couchdb.nouveau.lucene9.Lucene9Index;
@@ -250,35 +249,49 @@ public boolean exists(final String name) {
250249
}
251250

252251
public void deleteAll(final String path, final List<String> exclusions) throws IOException {
253-
LOGGER.info("deleting indexes below {} (excluding {})", path, exclusions == null ? "nothing" : exclusions);
252+
LOGGER.info(
253+
"deleting indexes matching {} (excluding {})",
254+
path,
255+
exclusions == null || exclusions.isEmpty() ? "nothing" : exclusions);
256+
var parts = path.split("/");
257+
deleteAll(rootDir, parts, 0, exclusions);
258+
}
254259

255-
final Path indexRootPath = indexRootPath(path);
256-
if (!indexRootPath.toFile().exists()) {
260+
private void deleteAll(final Path path, final String[] parts, final int index, final List<String> exclusions)
261+
throws IOException {
262+
// End of the path
263+
if (index == parts.length - 1) {
264+
try (var stream = Files.newDirectoryStream(path, parts[index])) {
265+
stream.forEach(p -> {
266+
if (exclusions != null && exclusions.indexOf(p.getFileName().toString()) != -1) {
267+
return;
268+
}
269+
final String relativeName = rootDir.relativize(p).toString();
270+
try {
271+
deleteIndex(relativeName);
272+
} catch (final IOException | InterruptedException e) {
273+
LOGGER.error("Exception deleting {}", p, e);
274+
}
275+
// Clean any newly empty directories.
276+
do {
277+
final File f = p.toFile();
278+
if (f.isDirectory() && f.list().length == 0) {
279+
f.delete();
280+
}
281+
} while ((p = p.getParent()) != null && !rootDir.equals(p));
282+
});
283+
}
257284
return;
258285
}
259-
Stream<Path> stream = Files.find(indexRootPath, 100, (p, attr) -> attr.isDirectory() && isIndex(p));
260-
try {
261-
stream.forEach((p) -> {
262-
final String relativeToExclusions = indexRootPath.relativize(p).toString();
263-
if (exclusions != null && exclusions.indexOf(relativeToExclusions) != -1) {
264-
return;
265-
}
266-
final String relativeName = rootDir.relativize(p).toString();
286+
// Recurse
287+
try (var stream = Files.newDirectoryStream(path, parts[index])) {
288+
stream.forEach(p -> {
267289
try {
268-
deleteIndex(relativeName);
269-
} catch (final IOException | InterruptedException e) {
270-
LOGGER.error("Exception deleting {}", p, e);
290+
deleteAll(p, parts, index + 1, exclusions);
291+
} catch (IOException e) {
292+
LOGGER.warn("Exception during delete of " + rootDir.relativize(p), e);
271293
}
272-
// Clean any newly empty directories.
273-
do {
274-
final File f = p.toFile();
275-
if (f.isDirectory() && f.list().length == 0) {
276-
f.delete();
277-
}
278-
} while ((p = p.getParent()) != null && !rootDir.equals(p));
279294
});
280-
} finally {
281-
stream.close();
282295
}
283296
}
284297

nouveau/src/test/java/org/apache/couchdb/nouveau/core/IndexManagerTest.java

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,32 @@
1616
import static org.assertj.core.api.Assertions.assertThat;
1717

1818
import com.fasterxml.jackson.databind.ObjectMapper;
19+
import java.io.IOException;
20+
import java.nio.file.Files;
1921
import java.nio.file.Path;
22+
import java.util.List;
2023
import java.util.concurrent.Executors;
2124
import java.util.concurrent.ForkJoinPool;
2225
import java.util.concurrent.ScheduledExecutorService;
2326
import java.util.concurrent.TimeUnit;
2427
import org.apache.couchdb.nouveau.api.IndexDefinition;
2528
import org.apache.couchdb.nouveau.api.SearchRequest;
2629
import org.apache.couchdb.nouveau.lucene9.ParallelSearcherFactory;
27-
import org.junit.jupiter.api.AfterAll;
28-
import org.junit.jupiter.api.BeforeAll;
30+
import org.junit.jupiter.api.AfterEach;
31+
import org.junit.jupiter.api.BeforeEach;
2932
import org.junit.jupiter.api.Test;
3033
import org.junit.jupiter.api.io.TempDir;
3134

3235
public class IndexManagerTest {
3336

34-
private static IndexManager manager;
35-
private static ScheduledExecutorService executorService;
37+
private Path rootDir;
38+
private IndexManager manager;
39+
private ScheduledExecutorService executorService;
3640

37-
@BeforeAll
38-
public static void setupManager(@TempDir Path path) throws Exception {
41+
@BeforeEach
42+
public void setupManager(@TempDir Path path) throws Exception {
3943
executorService = Executors.newScheduledThreadPool(2);
44+
rootDir = path;
4045

4146
manager = new IndexManager();
4247
manager.setRootDir(path);
@@ -47,8 +52,8 @@ public static void setupManager(@TempDir Path path) throws Exception {
4752
manager.start();
4853
}
4954

50-
@AfterAll
51-
public static void cleanup() throws Exception {
55+
@AfterEach
56+
public void cleanup() throws Exception {
5257
executorService.shutdownNow();
5358
executorService.awaitTermination(5, TimeUnit.SECONDS);
5459
manager.stop();
@@ -82,4 +87,60 @@ public void managerReopensAClosedIndex() throws Exception {
8287
});
8388
assertThat(isOpen);
8489
}
90+
91+
@Test
92+
public void deleteAllRemovesIndexByName() throws Exception {
93+
final IndexDefinition indexDefinition = new IndexDefinition();
94+
indexDefinition.setDefaultAnalyzer("standard");
95+
96+
assertThat(countIndexes()).isEqualTo(0);
97+
manager.create("bar", indexDefinition);
98+
assertThat(countIndexes()).isEqualTo(1);
99+
manager.deleteAll("bar", null);
100+
assertThat(countIndexes()).isEqualTo(0);
101+
}
102+
103+
@Test
104+
public void deleteAllRemovesIndexByPath() throws Exception {
105+
final IndexDefinition indexDefinition = new IndexDefinition();
106+
indexDefinition.setDefaultAnalyzer("standard");
107+
108+
assertThat(countIndexes()).isEqualTo(0);
109+
manager.create("foo/bar", indexDefinition);
110+
assertThat(countIndexes()).isEqualTo(1);
111+
manager.deleteAll("foo/bar", null);
112+
assertThat(countIndexes()).isEqualTo(0);
113+
}
114+
115+
@Test
116+
public void deleteAllRemovesIndexByGlob() throws Exception {
117+
final IndexDefinition indexDefinition = new IndexDefinition();
118+
indexDefinition.setDefaultAnalyzer("standard");
119+
120+
assertThat(countIndexes()).isEqualTo(0);
121+
manager.create("foo/bar", indexDefinition);
122+
assertThat(countIndexes()).isEqualTo(1);
123+
manager.deleteAll("foo/*", null);
124+
assertThat(countIndexes()).isEqualTo(0);
125+
}
126+
127+
@Test
128+
public void deleteAllRemovesIndexByGlobExceptExclusions() throws Exception {
129+
final IndexDefinition indexDefinition = new IndexDefinition();
130+
indexDefinition.setDefaultAnalyzer("standard");
131+
132+
assertThat(countIndexes()).isEqualTo(0);
133+
manager.create("foo/bar", indexDefinition);
134+
manager.create("foo/baz", indexDefinition);
135+
assertThat(countIndexes()).isEqualTo(2);
136+
manager.deleteAll("foo/*", List.of("bar"));
137+
assertThat(countIndexes()).isEqualTo(1);
138+
}
139+
140+
private long countIndexes() throws IOException {
141+
try (var stream =
142+
Files.find(rootDir, 10, (p, attr) -> p.getFileName().toString().equals("index_definition.json"))) {
143+
return stream.count();
144+
}
145+
}
85146
}

src/nouveau/src/nouveau_fabric_cleanup.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ go(DbName) ->
4040
Shards = mem3:shards(DbName),
4141
lists:foreach(
4242
fun(Shard) ->
43-
rexi:cast(Shard#shard.node, {nouveau_rpc, cleanup, [Shard#shard.name, ActiveSigs]})
43+
Path =
44+
<<"shards/", (mem3_util:range_to_hex(Shard#shard.range))/binary, "/", DbName/binary,
45+
".*/*">>,
46+
rexi:cast(Shard#shard.node, {nouveau_rpc, cleanup, [Path, ActiveSigs]})
4447
end,
4548
Shards
4649
).

src/nouveau/src/nouveau_rpc.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,6 @@ info(DbName, #index{} = Index0) ->
8989
rexi:reply({error, Reason})
9090
end.
9191

92-
cleanup(DbName, Exclusions) ->
93-
nouveau_api:delete_path(nouveau_util:index_name(DbName), Exclusions),
92+
cleanup(Path, Exclusions) ->
93+
nouveau_api:delete_path(nouveau_util:index_name(Path), Exclusions),
9494
rexi:reply(ok).

0 commit comments

Comments
 (0)