Skip to content

Commit 61b636c

Browse files
committed
close writer before soft-deletion
1 parent d8c828c commit 61b636c

File tree

6 files changed

+86
-28
lines changed

6 files changed

+86
-28
lines changed

src/main/scala/com/cloudant/clouseau/ClouseauTypeFactory.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ case class SearchRequest(options: Map[Symbol, Any])
3030

3131
case class OpenIndexMsg(peer: Pid, path: String, options: Any)
3232
case class CleanupPathMsg(path: String)
33-
case class RenamePathMsg(dbName: String)
33+
case class RenamePathMsg(path: String)
3434
case class CleanupDbMsg(dbName: String, activeSigs: List[String])
3535
case class DiskSizeMsg(path: String)
3636

src/main/scala/com/cloudant/clouseau/IndexCleanupService.scala

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@ package com.cloudant.clouseau
1515
import com.yammer.metrics.scala._
1616
import java.io.File
1717
import java.util.regex.Pattern
18-
import java.text.SimpleDateFormat
19-
import java.util.Calendar
20-
import java.util.TimeZone
2118
import org.apache.log4j.Logger
2219
import scalang._
2320

@@ -31,19 +28,10 @@ class IndexCleanupService(ctx: ServiceContext[ConfigurationArgs]) extends Servic
3128
val dir = new File(rootDir, path)
3229
logger.info("Removing %s".format(path))
3330
recursivelyDelete(dir)
34-
case RenamePathMsg(dbName: String) =>
35-
val srcDir = new File(rootDir, dbName)
36-
val sdf = new SimpleDateFormat("yyyyMMdd'.'HHmmss")
37-
sdf.setTimeZone(TimeZone.getTimeZone("UTC"))
38-
val sdfNow = sdf.format(Calendar.getInstance().getTime())
39-
// move timestamp information in dbName to end of destination path
40-
// for example, from foo.1234567890 to foo.20170912.092828.deleted.1234567890
41-
val destPath = dbName.dropRight(10) + sdfNow + ".deleted." + dbName.takeRight(10)
42-
val destDir = new File(rootDir, destPath)
43-
logger.info("Renaming '%s' to '%s'".format(
44-
srcDir.getAbsolutePath, destDir.getAbsolutePath)
45-
)
46-
rename(srcDir, destDir)
31+
case RenamePathMsg(path: String) =>
32+
logger.info("Soft-deleting " + path)
33+
val pattern = Pattern.compile(path + "/([0-9a-f]+)$")
34+
rename(rootDir, path, pattern)
4735
case CleanupDbMsg(dbName: String, activeSigs: List[String]) =>
4836
logger.info("Cleaning up " + dbName)
4937
val pattern = Pattern.compile("shards/[0-9a-f]+-[0-9a-f]+/" + dbName + "\\.[0-9]+/([0-9a-f]+)$")
@@ -78,13 +66,23 @@ class IndexCleanupService(ctx: ServiceContext[ConfigurationArgs]) extends Servic
7866
fileOrDir.delete
7967
}
8068

81-
private def rename(srcDir: File, destDir: File) {
82-
if (!srcDir.isDirectory) {
69+
private def rename(fileOrDir: File, path: String, includePattern: Pattern) {
70+
if (!fileOrDir.isDirectory) {
8371
return
8472
}
85-
if (!srcDir.renameTo(destDir)) {
86-
logger.error("Failed to rename directory from '%s' to '%s'".format(
87-
srcDir.getAbsolutePath, destDir.getAbsolutePath))
73+
for (file <- fileOrDir.listFiles) {
74+
rename(file, path, includePattern)
75+
}
76+
77+
val m = includePattern.matcher(fileOrDir.getAbsolutePath)
78+
if (m.find) {
79+
logger.info("Soft-deleting index " + m.group)
80+
call('main, ('rename, m.group)) match {
81+
case 'ok =>
82+
'ok
83+
case ('error, 'not_found) =>
84+
Utils.rename(rootDir, path, fileOrDir.getName)
85+
}
8886
}
8987
}
9088

src/main/scala/com/cloudant/clouseau/IndexManagerService.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,18 @@ class IndexManagerService(ctx: ServiceContext[ConfigurationArgs]) extends Servic
119119
pid ! 'delete
120120
'ok
121121
}
122+
case ('rename, path: String) =>
123+
lru.get(path) match {
124+
case null =>
125+
('error, 'not_found)
126+
case pid: Pid =>
127+
call('pid, ('close_writer))
128+
val dbpath = path.substring(0, path.lastIndexOf('/'))
129+
val sig = path.substring(path.lastIndexOf('/') + 1)
130+
Utils.rename(rootDir, dbpath, sig)
131+
call('pid, ('exit_with_deleted))
132+
'ok
133+
}
122134
case DiskSizeMsg(path: String) =>
123135
getDiskSize(path)
124136
case 'close_lru =>

src/main/scala/com/cloudant/clouseau/IndexService.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,12 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
169169
exit("Idle Timeout")
170170
}
171171
idle = true
172+
case 'close_writer =>
173+
debug("Closing writer")
174+
ctx.args.writer.close()
175+
'ok
176+
case 'exit_with_deleted =>
177+
exit('deleted)
172178
case 'count_fields =>
173179
countFields
174180
case 'delete =>
@@ -210,7 +216,6 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
210216
}
211217

212218
override def exit(msg: Any) {
213-
debug("Closed with reason: %.1000s".format(msg))
214219
try {
215220
reader.close()
216221
} catch {
@@ -220,7 +225,12 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
220225
ctx.args.writer.rollback()
221226
} catch {
222227
case e: AlreadyClosedException => 'ignored
223-
case e: IOException => warn("Error while closing writer", e)
228+
case e: IOException =>
229+
val dir = ctx.args.writer.getDirectory
230+
if (IndexWriter.isLocked(dir)) {
231+
IndexWriter.unlock(dir);
232+
}
233+
warn("Error while closing writer", e)
224234
} finally {
225235
super.exit(msg)
226236
}

src/main/scala/com/cloudant/clouseau/Utils.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@ package com.cloudant.clouseau
1515
import org.apache.lucene.index.Term
1616
import org.apache.lucene.util.BytesRef
1717
import org.apache.lucene.util.NumericUtils
18+
import org.apache.log4j.Logger
19+
import java.io.File
20+
import java.io.IOException
21+
import java.util.Calendar
22+
import java.util.TimeZone
23+
import java.text.SimpleDateFormat
1824

1925
object Utils {
2026

@@ -29,4 +35,33 @@ object Utils {
2935
new BytesRef(string)
3036
}
3137

38+
def rename(rootDir: File, dbName: String, sig: String) {
39+
val logger = Logger.getLogger("clouseau.utils")
40+
val srcParentDir = new File(rootDir, dbName)
41+
val sdf = new SimpleDateFormat("yyyyMMdd'.'HHmmss")
42+
sdf.setTimeZone(TimeZone.getTimeZone("UTC"))
43+
val sdfNow = sdf.format(Calendar.getInstance().getTime())
44+
// move timestamp information in dbName to end of destination path
45+
// for example, from foo.1234567890 to foo.20170912.092828.deleted.1234567890
46+
val destParentPath = dbName.dropRight(10) + sdfNow + ".deleted." + dbName.takeRight(10)
47+
val destParentDir = new File(rootDir, destParentPath)
48+
logger.info("Renaming '%s' to '%s'".format(
49+
srcParentDir.getAbsolutePath, destParentDir.getAbsolutePath)
50+
)
51+
if (!srcParentDir.isDirectory) {
52+
return
53+
}
54+
if (!destParentDir.exists) {
55+
destParentDir.mkdirs
56+
}
57+
58+
val srcDir = new File(srcParentDir, sig)
59+
val destDir = new File(destParentDir, sig)
60+
61+
if (!srcDir.renameTo(destDir)) {
62+
logger.error("Failed to rename directory from '%s' to '%s'".format(
63+
srcDir.getAbsolutePath, destDir.getAbsolutePath))
64+
}
65+
}
66+
3267
}

src/test/scala/com/cloudant/clouseau/IndexCleanupServiceSpec.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ class IndexCleanupServiceSpec extends SpecificationWithJUnit {
2323
"the index clean-up service" should {
2424

2525
"rename index when database is deleted" in new cleanup_service {
26-
node.cast(service, RenamePathMsg("foo.1234567890")) must be equalTo 'ok
26+
node.cast(cleanup, RenamePathMsg("shards/00000000-ffffffff/foo.1234567890")) must be equalTo 'ok
2727
Thread.sleep(1000)
28-
val indexdir = new File("target", "indexes")
28+
val indexdir = new File(new File(new File("target", "indexes"), "shards"), "00000000-ffffffff")
2929
var subdirlist = List[String]()
3030

3131
for (file <- indexdir.listFiles if file.getName contains ".deleted") {
@@ -41,7 +41,9 @@ class IndexCleanupServiceSpec extends SpecificationWithJUnit {
4141
trait cleanup_service extends RunningNode {
4242
val config = new SystemConfiguration()
4343
val args = new ConfigurationArgs(config)
44-
val service = node.spawnService[IndexCleanupService, ConfigurationArgs](args)
44+
val cleanup = node.spawnService[IndexCleanupService, ConfigurationArgs](args)
45+
var manager = node.spawnService[IndexManagerService, ConfigurationArgs]('main, args)
46+
4547
val mbox = node.spawnMbox
4648

4749
val dir = new File("target", "indexes")
@@ -51,7 +53,8 @@ trait cleanup_service extends RunningNode {
5153
}
5254
}
5355

54-
val foodir = new File(new File("target", "indexes"), "foo.1234567890")
56+
val foodir = new File(new File(new File(new File(new File("target", "indexes"), "shards"),
57+
"00000000-ffffffff"), "foo.1234567890"), "5838a59330e52227a58019dc1b9edd6e")
5558
if (!foodir.exists) {
5659
foodir.mkdirs
5760
}

0 commit comments

Comments
 (0)