Skip to content

Commit

Permalink
Refactor journal to always write through contexts
Browse files Browse the repository at this point in the history
Before we would sometimes write through a context and
sometimes write directly. This PR changes the API so
that a context must always be used.

This PR also makes AbstractMaster lighter, moving
MasterJournalContext to its own class and having
journals manage their own AsyncJournalWriters internally.
  • Loading branch information
aaudiber committed Oct 5, 2017
1 parent 8069c25 commit 056b7a4
Show file tree
Hide file tree
Showing 25 changed files with 254 additions and 370 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
*/
@ThreadSafe
public enum PreconditionMessage {
ASYNC_JOURNAL_WRITER_NULL("AsyncJournalWriter cannot be null"),
BLOCK_LENGTH_INCONSISTENT("Block %s is expected to be %s bytes, but only %s bytes are available. "
+ "Please ensure its metadata is consistent between Alluxio and UFS."),
COMMAND_LINE_LINEAGE_ONLY("Only command line jobs are supported by createLineage"),
Expand Down
4 changes: 3 additions & 1 deletion core/common/src/main/java/alluxio/resource/LockResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package alluxio.resource;

import java.io.Closeable;
import java.util.concurrent.locks.Lock;

/**
Expand All @@ -22,7 +23,8 @@
* }
* </pre>
*/
public class LockResource implements AutoCloseable {
// extends Closeable instead of AutoCloseable to enable usage with Guava's Closer.
public class LockResource implements Closeable {
private final Lock mLock;

/**
Expand Down
122 changes: 1 addition & 121 deletions core/server/common/src/main/java/alluxio/master/AbstractMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,12 @@

package alluxio.master;

import alluxio.Configuration;
import alluxio.Constants;
import alluxio.PropertyKey;
import alluxio.Server;
import alluxio.clock.Clock;
import alluxio.exception.PreconditionMessage;
import alluxio.master.journal.AsyncJournalWriter;
import alluxio.master.journal.Journal;
import alluxio.master.journal.JournalContext;
import alluxio.master.journal.JournalSystem;
import alluxio.proto.journal.Journal.JournalEntry;
import alluxio.retry.RetryPolicy;
import alluxio.retry.TimeoutRetry;
import alluxio.util.executor.ExecutorServiceFactory;

import com.google.common.base.Preconditions;
Expand All @@ -46,10 +39,7 @@
@NotThreadSafe // TODO(jiri): make thread-safe (c.f. ALLUXIO-1664)
public abstract class AbstractMaster implements Master {
private static final Logger LOG = LoggerFactory.getLogger(AbstractMaster.class);
private static final long INVALID_FLUSH_COUNTER = -1;
private static final long SHUTDOWN_TIMEOUT_MS = 10 * Constants.SECOND_MS;
private static final long JOURNAL_FLUSH_RETRY_TIMEOUT_MS =
Configuration.getMs(PropertyKey.MASTER_JOURNAL_FLUSH_TIMEOUT_MS);

/** A factory for creating executor services when they are needed. */
private ExecutorServiceFactory mExecutorServiceFactory;
Expand All @@ -59,8 +49,6 @@ public abstract class AbstractMaster implements Master {
private Journal mJournal;
/** true if this master is in primary mode, and not secondary mode. */
private boolean mIsPrimary = false;
/** The {@link AsyncJournalWriter} for async journal writes. */
private AsyncJournalWriter mAsyncJournalWriter;

/** The clock to use for determining the time. */
protected final Clock mClock;
Expand Down Expand Up @@ -102,7 +90,6 @@ public void start(Boolean isPrimary) throws IOException {
*/

LOG.info("{}: Starting primary master.", getName());
mAsyncJournalWriter = new AsyncJournalWriter(mJournal);
}
}

Expand Down Expand Up @@ -131,75 +118,6 @@ public void stop() throws IOException {
LOG.info("{}: Stopped {} master.", getName(), mIsPrimary ? "primary" : "secondary");
}

/**
* Writes a {@link JournalEntry} to the journal. Does NOT flush the journal.
*
* @param entry the {@link JournalEntry} to write to the journal
*/
protected void writeJournalEntry(JournalEntry entry) {
Preconditions.checkNotNull(mJournal, "Cannot write entry: journal is null.");
try {
mJournal.write(entry);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
* Flushes the journal.
*/
protected void flushJournal() {
Preconditions.checkNotNull(mJournal, "Cannot flush journal: journal is null.");
try {
mJournal.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
* Appends a {@link JournalEntry} for writing to the journal.
*
* @param entry the {@link JournalEntry}
* @param journalContext the journal context
*/
protected void appendJournalEntry(JournalEntry entry, JournalContext journalContext) {
Preconditions.checkNotNull(mAsyncJournalWriter, PreconditionMessage.ASYNC_JOURNAL_WRITER_NULL);
journalContext.append(entry);
}

/**
* Waits for the flush counter to be flushed to the journal. If the counter is
* {@link #INVALID_FLUSH_COUNTER}, this is a noop.
*
* @param journalContext the journal context
*/
private void waitForJournalFlush(JournalContext journalContext) {
if (journalContext.getFlushCounter() == INVALID_FLUSH_COUNTER) {
// Check this before the precondition.
return;
}
Preconditions.checkNotNull(mAsyncJournalWriter, PreconditionMessage.ASYNC_JOURNAL_WRITER_NULL);

RetryPolicy retry = new TimeoutRetry(JOURNAL_FLUSH_RETRY_TIMEOUT_MS, Constants.SECOND_MS);
while (retry.attemptRetry()) {
try {
mAsyncJournalWriter.flush(journalContext.getFlushCounter());
return;
} catch (IOException e) {
LOG.warn("Journal flush failed. retrying...", e);
}
}
LOG.error(
"Journal flush failed after {} attempts. Terminating process to prevent inconsistency.",
retry.getRetryCount());
if (Configuration.getBoolean(PropertyKey.TEST_MODE)) {
throw new RuntimeException("Journal flush failed after " + retry.getRetryCount()
+ " attempts. Terminating process to prevent inconsistency.");
}
System.exit(-1);
}

/**
* @return the {@link ExecutorService} for this master
*/
Expand All @@ -211,44 +129,6 @@ protected ExecutorService getExecutorService() {
* @return new instance of {@link JournalContext}
*/
protected JournalContext createJournalContext() {
return new MasterJournalContext(mAsyncJournalWriter);
}

/**
* Context for storing journaling information.
*/
@NotThreadSafe
public final class MasterJournalContext implements JournalContext {
private final AsyncJournalWriter mAsyncJournalWriter;
private long mFlushCounter;

/**
* Constructs a {@link MasterJournalContext}.
*
* @param asyncJournalWriter a {@link AsyncJournalWriter}
*/
private MasterJournalContext(AsyncJournalWriter asyncJournalWriter) {
mAsyncJournalWriter = asyncJournalWriter;
mFlushCounter = INVALID_FLUSH_COUNTER;
}

@Override
public long getFlushCounter() {
return mFlushCounter;
}

@Override
public void append(JournalEntry entry) {
if (mAsyncJournalWriter != null) {
mFlushCounter = mAsyncJournalWriter.appendEntry(entry);
}
}

@Override
public void close() {
if (mAsyncJournalWriter != null) {
waitForJournalFlush(this);
}
}
return mJournal.createJournalContext();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*/
@ThreadSafe
public final class AsyncJournalWriter {
private final Journal mJournal;
private final JournalWriter mJournalWriter;
private final ConcurrentLinkedQueue<JournalEntry> mQueue;
/** Represents the count of entries added to the journal queue. */
private final AtomicLong mCounter;
Expand All @@ -53,10 +53,10 @@ public final class AsyncJournalWriter {
/**
* Creates a {@link AsyncJournalWriter}.
*
* @param journal the {@link Journal} to use for writing
* @param journalWriter a journal writer to write to
*/
public AsyncJournalWriter(Journal journal) {
mJournal = Preconditions.checkNotNull(journal, "journal");
public AsyncJournalWriter(JournalWriter journalWriter) {
mJournalWriter = Preconditions.checkNotNull(journalWriter, "journalWriter");
mQueue = new ConcurrentLinkedQueue<>();
mCounter = new AtomicLong(0);
mFlushCounter = new AtomicLong(0);
Expand Down Expand Up @@ -124,7 +124,7 @@ public void flush(final long targetCounter) throws IOException {
// No more entries in the queue. Break out of the infinite for-loop.
break;
}
mJournal.write(entry);
mJournalWriter.write(entry);
// Remove the head entry, after the entry was successfully written.
mQueue.poll();
writeCounter = mWriteCounter.incrementAndGet();
Expand All @@ -138,7 +138,7 @@ public void flush(final long targetCounter) throws IOException {
}
}
}
mJournal.flush();
mJournalWriter.flush();
mFlushCounter.set(writeCounter);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@

package alluxio.master.journal;

import alluxio.proto.journal.Journal.JournalEntry;

import java.io.Closeable;
import java.io.IOException;
import java.net.URI;

/**
Expand All @@ -27,15 +24,7 @@ public interface Journal extends Closeable {
URI getLocation();

/**
* Writes an entry. {@link #flush} should be called afterwards if we want to make sure the entry
* is persisted.
*
* @param entry the journal entry to write
*/
void write(JournalEntry entry) throws IOException;

/**
* Flushes all the entries written to the underlying storage.
* @return a journal context for appending journal entries
*/
void flush() throws IOException;
JournalContext createJournalContext();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@ public interface JournalContext extends Closeable {
*/
void append(JournalEntry entry);

/**
* @return the journal flush counter
*/
long getFlushCounter();

@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public Builder setLocation(URI location) {
mLocation = location;
return this;
}

/**
* @param quietTimeMs before upgrading from SECONDARY to PRIMARY mode, the journal will wait
* until this duration has passed without any journal entries being written.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.master.journal;

import alluxio.proto.journal.Journal.JournalEntry;

import java.io.IOException;

/**
* Interface for a class that can write and flush journal entries.
*/
public interface JournalWriter {
/**
* Writes an entry. {@link #flush} should be called afterwards if we want to make sure the entry
* is persisted.
*
* @param entry the journal entry to write
*/
void write(JournalEntry entry) throws IOException;

/**
* Flushes all the entries written to the underlying storage.
*/
void flush() throws IOException;
}
Loading

0 comments on commit 056b7a4

Please sign in to comment.