Skip to content

Commit

Permalink
[WIP] Core: Create commit groups in commit service offer in RewriteDa…
Browse files Browse the repository at this point in the history
…taFilesCommitManager
  • Loading branch information
singhpk234 committed Jan 7, 2023
1 parent 445503f commit 4c373c1
Showing 1 changed file with 39 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Closeable;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -136,6 +137,7 @@ public CommitService service(int rewritesPerCommit) {
public class CommitService implements Closeable {
private final ExecutorService committerService;
private final ConcurrentLinkedQueue<RewriteFileGroup> completedRewrites;
private final ConcurrentLinkedQueue<String> inProgressCommits;
private final List<RewriteFileGroup> committedRewrites;
private final int rewritesPerCommit;
private final AtomicBoolean running = new AtomicBoolean(false);
Expand All @@ -153,6 +155,7 @@ public class CommitService implements Closeable {

completedRewrites = Queues.newConcurrentLinkedQueue();
committedRewrites = Lists.newArrayList();
inProgressCommits = Queues.newConcurrentLinkedQueue();
}

/** Starts a single threaded executor service for handling file group commits. */
Expand All @@ -163,9 +166,9 @@ public void start() {
// Partial progress commit service
committerService.execute(
() -> {
while (running.get() || completedRewrites.size() > 0) {
while (running.get() || completedRewrites.size() > 0 || inProgressCommits.size() > 0) {
try {
if (completedRewrites.size() == 0) {
if (completedRewrites.size() == 0 && inProgressCommits.size() == 0) {
// Give other threads a chance to make progress
Thread.sleep(100);
}
Expand All @@ -174,23 +177,8 @@ public void start() {
throw new RuntimeException("Interrupted while processing commits", e);
}

// Either we have a full commit group, or we have completed writing and need to commit
// what is left over
if (completedRewrites.size() >= rewritesPerCommit
|| (!running.get() && completedRewrites.size() > 0)) {
Set<RewriteFileGroup> batch = Sets.newHashSetWithExpectedSize(rewritesPerCommit);
for (int i = 0; i < rewritesPerCommit && !completedRewrites.isEmpty(); i++) {
batch.add(completedRewrites.poll());
}

try {
commitOrClean(batch);
committedRewrites.addAll(batch);
} catch (Exception e) {
LOG.error(
"Failure during rewrite commit process, partial progress enabled. Ignoring",
e);
}
if (!running.get() && completedRewrites.size() > 0) {
commitReadyCommitGroups();
}
}
});
Expand All @@ -208,6 +196,7 @@ public void offer(RewriteFileGroup group) {
Preconditions.checkState(
running.get(), "Cannot add rewrites to a service which has already been closed");
completedRewrites.add(group);
commitReadyCommitGroups();
}

/** Returns all File groups which have been committed */
Expand Down Expand Up @@ -249,5 +238,36 @@ public void close() {
"File groups offered after service was closed, "
+ "they were not successfully committed.");
}

private void commitReadyCommitGroups() {
if (canCreateCommitGroup()) {
synchronized (completedRewrites) {
if (canCreateCommitGroup()) {
String inProgressCommitToken = UUID.randomUUID().toString();
Set<RewriteFileGroup> batch = Sets.newHashSetWithExpectedSize(rewritesPerCommit);
for (int i = 0; i < rewritesPerCommit && !completedRewrites.isEmpty(); i++) {
batch.add(completedRewrites.poll());
}
inProgressCommits.add(inProgressCommitToken);
try {
commitOrClean(batch);
committedRewrites.addAll(batch);
} catch (Exception e) {
LOG.error(
"Failure during rewrite commit process, partial progress enabled. Ignoring", e);
}
// remove in-progress token
inProgressCommits.remove(inProgressCommitToken);
}
}
}
}

private boolean canCreateCommitGroup() {
// Either we have a full commit group, or we have completed writing and need to commit
// what is left over
return (completedRewrites.size() >= rewritesPerCommit)
|| (!running.get() && completedRewrites.size() > 0);
}
}
}

0 comments on commit 4c373c1

Please sign in to comment.