diff --git a/core/common/src/main/java/alluxio/exception/PreconditionMessage.java b/core/common/src/main/java/alluxio/exception/PreconditionMessage.java index 17d7fbf745b8..e54c6520aae0 100644 --- a/core/common/src/main/java/alluxio/exception/PreconditionMessage.java +++ b/core/common/src/main/java/alluxio/exception/PreconditionMessage.java @@ -20,7 +20,6 @@ */ @ThreadSafe public enum PreconditionMessage { - ASYNC_JOURNAL_WRITER_NULL("AsyncJournalWriter cannot be null"), BLOCK_LENGTH_INCONSISTENT("Block %s is expected to be %s bytes, but only %s bytes are available. " + "Please ensure its metadata is consistent between Alluxio and UFS."), COMMAND_LINE_LINEAGE_ONLY("Only command line jobs are supported by createLineage"), diff --git a/core/common/src/main/java/alluxio/resource/LockResource.java b/core/common/src/main/java/alluxio/resource/LockResource.java index fa8ac1312288..6cf4e23d6cc1 100644 --- a/core/common/src/main/java/alluxio/resource/LockResource.java +++ b/core/common/src/main/java/alluxio/resource/LockResource.java @@ -11,6 +11,7 @@ package alluxio.resource; +import java.io.Closeable; import java.util.concurrent.locks.Lock; /** @@ -22,7 +23,8 @@ * } * */ -public class LockResource implements AutoCloseable { +// extends Closeable instead of AutoCloseable to enable usage with Guava's Closer. +public class LockResource implements Closeable { private final Lock mLock; /** diff --git a/core/server/common/src/main/java/alluxio/master/AbstractMaster.java b/core/server/common/src/main/java/alluxio/master/AbstractMaster.java index 35ca2205520e..4003ee43fff1 100644 --- a/core/server/common/src/main/java/alluxio/master/AbstractMaster.java +++ b/core/server/common/src/main/java/alluxio/master/AbstractMaster.java @@ -11,19 +11,12 @@ package alluxio.master; -import alluxio.Configuration; import alluxio.Constants; -import alluxio.PropertyKey; import alluxio.Server; import alluxio.clock.Clock; -import alluxio.exception.PreconditionMessage; -import alluxio.master.journal.AsyncJournalWriter; import alluxio.master.journal.Journal; import alluxio.master.journal.JournalContext; import alluxio.master.journal.JournalSystem; -import alluxio.proto.journal.Journal.JournalEntry; -import alluxio.retry.RetryPolicy; -import alluxio.retry.TimeoutRetry; import alluxio.util.executor.ExecutorServiceFactory; import com.google.common.base.Preconditions; @@ -46,10 +39,7 @@ @NotThreadSafe // TODO(jiri): make thread-safe (c.f. ALLUXIO-1664) public abstract class AbstractMaster implements Master { private static final Logger LOG = LoggerFactory.getLogger(AbstractMaster.class); - private static final long INVALID_FLUSH_COUNTER = -1; private static final long SHUTDOWN_TIMEOUT_MS = 10 * Constants.SECOND_MS; - private static final long JOURNAL_FLUSH_RETRY_TIMEOUT_MS = - Configuration.getMs(PropertyKey.MASTER_JOURNAL_FLUSH_TIMEOUT_MS); /** A factory for creating executor services when they are needed. */ private ExecutorServiceFactory mExecutorServiceFactory; @@ -59,8 +49,6 @@ public abstract class AbstractMaster implements Master { private Journal mJournal; /** true if this master is in primary mode, and not secondary mode. */ private boolean mIsPrimary = false; - /** The {@link AsyncJournalWriter} for async journal writes. */ - private AsyncJournalWriter mAsyncJournalWriter; /** The clock to use for determining the time. */ protected final Clock mClock; @@ -102,7 +90,6 @@ public void start(Boolean isPrimary) throws IOException { */ LOG.info("{}: Starting primary master.", getName()); - mAsyncJournalWriter = new AsyncJournalWriter(mJournal); } } @@ -131,75 +118,6 @@ public void stop() throws IOException { LOG.info("{}: Stopped {} master.", getName(), mIsPrimary ? "primary" : "secondary"); } - /** - * Writes a {@link JournalEntry} to the journal. Does NOT flush the journal. - * - * @param entry the {@link JournalEntry} to write to the journal - */ - protected void writeJournalEntry(JournalEntry entry) { - Preconditions.checkNotNull(mJournal, "Cannot write entry: journal is null."); - try { - mJournal.write(entry); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** - * Flushes the journal. - */ - protected void flushJournal() { - Preconditions.checkNotNull(mJournal, "Cannot flush journal: journal is null."); - try { - mJournal.flush(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** - * Appends a {@link JournalEntry} for writing to the journal. - * - * @param entry the {@link JournalEntry} - * @param journalContext the journal context - */ - protected void appendJournalEntry(JournalEntry entry, JournalContext journalContext) { - Preconditions.checkNotNull(mAsyncJournalWriter, PreconditionMessage.ASYNC_JOURNAL_WRITER_NULL); - journalContext.append(entry); - } - - /** - * Waits for the flush counter to be flushed to the journal. If the counter is - * {@link #INVALID_FLUSH_COUNTER}, this is a noop. - * - * @param journalContext the journal context - */ - private void waitForJournalFlush(JournalContext journalContext) { - if (journalContext.getFlushCounter() == INVALID_FLUSH_COUNTER) { - // Check this before the precondition. - return; - } - Preconditions.checkNotNull(mAsyncJournalWriter, PreconditionMessage.ASYNC_JOURNAL_WRITER_NULL); - - RetryPolicy retry = new TimeoutRetry(JOURNAL_FLUSH_RETRY_TIMEOUT_MS, Constants.SECOND_MS); - while (retry.attemptRetry()) { - try { - mAsyncJournalWriter.flush(journalContext.getFlushCounter()); - return; - } catch (IOException e) { - LOG.warn("Journal flush failed. retrying...", e); - } - } - LOG.error( - "Journal flush failed after {} attempts. Terminating process to prevent inconsistency.", - retry.getRetryCount()); - if (Configuration.getBoolean(PropertyKey.TEST_MODE)) { - throw new RuntimeException("Journal flush failed after " + retry.getRetryCount() - + " attempts. Terminating process to prevent inconsistency."); - } - System.exit(-1); - } - /** * @return the {@link ExecutorService} for this master */ @@ -211,44 +129,6 @@ protected ExecutorService getExecutorService() { * @return new instance of {@link JournalContext} */ protected JournalContext createJournalContext() { - return new MasterJournalContext(mAsyncJournalWriter); - } - - /** - * Context for storing journaling information. - */ - @NotThreadSafe - public final class MasterJournalContext implements JournalContext { - private final AsyncJournalWriter mAsyncJournalWriter; - private long mFlushCounter; - - /** - * Constructs a {@link MasterJournalContext}. - * - * @param asyncJournalWriter a {@link AsyncJournalWriter} - */ - private MasterJournalContext(AsyncJournalWriter asyncJournalWriter) { - mAsyncJournalWriter = asyncJournalWriter; - mFlushCounter = INVALID_FLUSH_COUNTER; - } - - @Override - public long getFlushCounter() { - return mFlushCounter; - } - - @Override - public void append(JournalEntry entry) { - if (mAsyncJournalWriter != null) { - mFlushCounter = mAsyncJournalWriter.appendEntry(entry); - } - } - - @Override - public void close() { - if (mAsyncJournalWriter != null) { - waitForJournalFlush(this); - } - } + return mJournal.createJournalContext(); } } diff --git a/core/server/common/src/main/java/alluxio/master/journal/AsyncJournalWriter.java b/core/server/common/src/main/java/alluxio/master/journal/AsyncJournalWriter.java index d73e34360531..3e244dc46742 100644 --- a/core/server/common/src/main/java/alluxio/master/journal/AsyncJournalWriter.java +++ b/core/server/common/src/main/java/alluxio/master/journal/AsyncJournalWriter.java @@ -30,7 +30,7 @@ */ @ThreadSafe public final class AsyncJournalWriter { - private final Journal mJournal; + private final JournalWriter mJournalWriter; private final ConcurrentLinkedQueue mQueue; /** Represents the count of entries added to the journal queue. */ private final AtomicLong mCounter; @@ -53,10 +53,10 @@ public final class AsyncJournalWriter { /** * Creates a {@link AsyncJournalWriter}. * - * @param journal the {@link Journal} to use for writing + * @param journalWriter a journal writer to write to */ - public AsyncJournalWriter(Journal journal) { - mJournal = Preconditions.checkNotNull(journal, "journal"); + public AsyncJournalWriter(JournalWriter journalWriter) { + mJournalWriter = Preconditions.checkNotNull(journalWriter, "journalWriter"); mQueue = new ConcurrentLinkedQueue<>(); mCounter = new AtomicLong(0); mFlushCounter = new AtomicLong(0); @@ -124,7 +124,7 @@ public void flush(final long targetCounter) throws IOException { // No more entries in the queue. Break out of the infinite for-loop. break; } - mJournal.write(entry); + mJournalWriter.write(entry); // Remove the head entry, after the entry was successfully written. mQueue.poll(); writeCounter = mWriteCounter.incrementAndGet(); @@ -138,7 +138,7 @@ public void flush(final long targetCounter) throws IOException { } } } - mJournal.flush(); + mJournalWriter.flush(); mFlushCounter.set(writeCounter); } } diff --git a/core/server/common/src/main/java/alluxio/master/journal/Journal.java b/core/server/common/src/main/java/alluxio/master/journal/Journal.java index 48d24ce6ccbc..e05ab374d0c7 100644 --- a/core/server/common/src/main/java/alluxio/master/journal/Journal.java +++ b/core/server/common/src/main/java/alluxio/master/journal/Journal.java @@ -11,10 +11,7 @@ package alluxio.master.journal; -import alluxio.proto.journal.Journal.JournalEntry; - import java.io.Closeable; -import java.io.IOException; import java.net.URI; /** @@ -27,15 +24,7 @@ public interface Journal extends Closeable { URI getLocation(); /** - * Writes an entry. {@link #flush} should be called afterwards if we want to make sure the entry - * is persisted. - * - * @param entry the journal entry to write - */ - void write(JournalEntry entry) throws IOException; - - /** - * Flushes all the entries written to the underlying storage. + * @return a journal context for appending journal entries */ - void flush() throws IOException; + JournalContext createJournalContext(); } diff --git a/core/server/common/src/main/java/alluxio/master/journal/JournalContext.java b/core/server/common/src/main/java/alluxio/master/journal/JournalContext.java index 2e7c21bbe42d..6f19b2814faf 100644 --- a/core/server/common/src/main/java/alluxio/master/journal/JournalContext.java +++ b/core/server/common/src/main/java/alluxio/master/journal/JournalContext.java @@ -24,11 +24,6 @@ public interface JournalContext extends Closeable { */ void append(JournalEntry entry); - /** - * @return the journal flush counter - */ - long getFlushCounter(); - @Override void close(); } diff --git a/core/server/common/src/main/java/alluxio/master/journal/JournalSystem.java b/core/server/common/src/main/java/alluxio/master/journal/JournalSystem.java index efa912e0ad30..43b049f04c9e 100644 --- a/core/server/common/src/main/java/alluxio/master/journal/JournalSystem.java +++ b/core/server/common/src/main/java/alluxio/master/journal/JournalSystem.java @@ -163,6 +163,7 @@ public Builder setLocation(URI location) { mLocation = location; return this; } + /** * @param quietTimeMs before upgrading from SECONDARY to PRIMARY mode, the journal will wait * until this duration has passed without any journal entries being written. diff --git a/core/server/common/src/main/java/alluxio/master/journal/JournalWriter.java b/core/server/common/src/main/java/alluxio/master/journal/JournalWriter.java new file mode 100644 index 000000000000..c92927840d56 --- /dev/null +++ b/core/server/common/src/main/java/alluxio/master/journal/JournalWriter.java @@ -0,0 +1,34 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.master.journal; + +import alluxio.proto.journal.Journal.JournalEntry; + +import java.io.IOException; + +/** + * Interface for a class that can write and flush journal entries. + */ +public interface JournalWriter { + /** + * Writes an entry. {@link #flush} should be called afterwards if we want to make sure the entry + * is persisted. + * + * @param entry the journal entry to write + */ + void write(JournalEntry entry) throws IOException; + + /** + * Flushes all the entries written to the underlying storage. + */ + void flush() throws IOException; +} diff --git a/core/server/common/src/main/java/alluxio/master/journal/MasterJournalContext.java b/core/server/common/src/main/java/alluxio/master/journal/MasterJournalContext.java new file mode 100644 index 000000000000..fb68429ad09f --- /dev/null +++ b/core/server/common/src/main/java/alluxio/master/journal/MasterJournalContext.java @@ -0,0 +1,97 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.master.journal; + +import alluxio.Configuration; +import alluxio.Constants; +import alluxio.PropertyKey; +import alluxio.proto.journal.Journal.JournalEntry; +import alluxio.retry.RetryPolicy; +import alluxio.retry.TimeoutRetry; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import javax.annotation.concurrent.NotThreadSafe; + +/** + * Context for storing master journal information. + */ +@NotThreadSafe +public final class MasterJournalContext implements JournalContext { + private static final Logger LOG = LoggerFactory.getLogger(MasterJournalContext.class); + private static final long INVALID_FLUSH_COUNTER = -1; + private static final long JOURNAL_FLUSH_RETRY_TIMEOUT_MS = + Configuration.getMs(PropertyKey.MASTER_JOURNAL_FLUSH_TIMEOUT_MS); + + private final AsyncJournalWriter mAsyncJournalWriter; + private long mFlushCounter; + + /** + * Constructs a {@link MasterJournalContext}. + * + * @param asyncJournalWriter a {@link AsyncJournalWriter} + */ + public MasterJournalContext(AsyncJournalWriter asyncJournalWriter) { + Preconditions.checkNotNull(asyncJournalWriter, "asyncJournalWriter"); + mAsyncJournalWriter = asyncJournalWriter; + mFlushCounter = INVALID_FLUSH_COUNTER; + } + + @Override + public void append(JournalEntry entry) { + mFlushCounter = mAsyncJournalWriter.appendEntry(entry); + } + + /** + * Waits for the flush counter to be flushed to the journal. If the counter is + * {@link #INVALID_FLUSH_COUNTER}, this is a noop. + */ + private void waitForJournalFlush() { + if (mFlushCounter == INVALID_FLUSH_COUNTER) { + // Check this before the precondition. + return; + } + + RetryPolicy retry = new TimeoutRetry(JOURNAL_FLUSH_RETRY_TIMEOUT_MS, Constants.SECOND_MS); + while (retry.attemptRetry()) { + try { + mAsyncJournalWriter.flush(mFlushCounter); + return; + } catch (IOException e) { + LOG.warn("Journal flush failed. retrying...", e); + } catch (Throwable e) { + LOG.error("Journal flush failed. Terminating process to prevent inconsistency.", e); + if (Configuration.getBoolean(PropertyKey.TEST_MODE)) { + throw new RuntimeException("Journal flush failed", e); + } + System.exit(-1); + } + } + LOG.error( + "Journal flush failed after {} attempts. Terminating process to prevent inconsistency.", + retry.getRetryCount()); + if (Configuration.getBoolean(PropertyKey.TEST_MODE)) { + throw new RuntimeException("Journal flush failed after " + retry.getRetryCount() + + " attempts. Terminating process to prevent inconsistency."); + } + System.exit(-1); + } + + @Override + public void close() { + waitForJournalFlush(); + } +} diff --git a/core/server/common/src/main/java/alluxio/master/journal/NoopJournalContext.java b/core/server/common/src/main/java/alluxio/master/journal/NoopJournalContext.java index 2097cc11a9ba..775f9dfdb255 100644 --- a/core/server/common/src/main/java/alluxio/master/journal/NoopJournalContext.java +++ b/core/server/common/src/main/java/alluxio/master/journal/NoopJournalContext.java @@ -34,11 +34,6 @@ public void append(JournalEntry entry) { // Do nothing } - @Override - public long getFlushCounter() { - return -1; - } - @Override public void close() { // Do nothing diff --git a/core/server/common/src/main/java/alluxio/master/journal/noop/NoopJournal.java b/core/server/common/src/main/java/alluxio/master/journal/noop/NoopJournal.java deleted file mode 100644 index a0cfde5e23ea..000000000000 --- a/core/server/common/src/main/java/alluxio/master/journal/noop/NoopJournal.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 - * (the "License"). You may not use this work except in compliance with the License, which is - * available at www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied, as more fully set forth in the License. - * - * See the NOTICE file distributed with this work for information regarding copyright ownership. - */ - -package alluxio.master.journal.noop; - -import alluxio.master.journal.Journal; -import alluxio.proto.journal.Journal.JournalEntry; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; - -/** - * Implementation of {@link Journal} that does nothing. - */ -public class NoopJournal implements Journal { - - /** - * Creates a new instance of {@link NoopJournal}. - */ - public NoopJournal() {} - - @Override - public URI getLocation() { - try { - return new URI("/noop"); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - } - - @Override - public void write(JournalEntry entry) throws IOException {} - - @Override - public void flush() throws IOException {} - - @Override - public void close() throws IOException {} -} diff --git a/core/server/common/src/main/java/alluxio/master/journal/noop/NoopJournalReader.java b/core/server/common/src/main/java/alluxio/master/journal/noop/NoopJournalReader.java deleted file mode 100644 index 4592a9b017b5..000000000000 --- a/core/server/common/src/main/java/alluxio/master/journal/noop/NoopJournalReader.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 - * (the "License"). You may not use this work except in compliance with the License, which is - * available at www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied, as more fully set forth in the License. - * - * See the NOTICE file distributed with this work for information regarding copyright ownership. - */ - -package alluxio.master.journal.noop; - -import alluxio.exception.InvalidJournalEntryException; -import alluxio.master.journal.JournalReader; -import alluxio.proto.journal.Journal; - -import java.io.IOException; - -/** - * Implementation of {@link JournalReader} that does nothing. - */ -public class NoopJournalReader implements JournalReader { - - /** - * Creates a new instance of {@link NoopJournalReader}. - */ - public NoopJournalReader() {} - - @Override - public Journal.JournalEntry read() throws IOException, InvalidJournalEntryException { - return null; - } - - @Override - public long getNextSequenceNumber() { - return 0; - } - - @Override - public void close() throws IOException {} -} diff --git a/core/server/common/src/main/java/alluxio/master/journal/noop/NoopJournalSystem.java b/core/server/common/src/main/java/alluxio/master/journal/noop/NoopJournalSystem.java deleted file mode 100644 index c6bdd4c1dd4d..000000000000 --- a/core/server/common/src/main/java/alluxio/master/journal/noop/NoopJournalSystem.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 - * (the "License"). You may not use this work except in compliance with the License, which is - * available at www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied, as more fully set forth in the License. - * - * See the NOTICE file distributed with this work for information regarding copyright ownership. - */ - -package alluxio.master.journal.noop; - -import alluxio.master.journal.Journal; -import alluxio.master.journal.JournalEntryStateMachine; -import alluxio.master.journal.JournalSystem; - -import java.io.IOException; - -/** - * Journal system which doesn't do anything. - */ -public final class NoopJournalSystem implements JournalSystem { - /** - * Constructs a new {@link NoopJournalSystem}. - */ - public NoopJournalSystem() {} - - @Override - public Journal createJournal(JournalEntryStateMachine master) { - return new NoopJournal(); - } - - @Override - public void setMode(Mode mode) {} - - @Override - public boolean isFormatted() throws IOException { - return true; - } - - @Override - public void format() throws IOException {} - - @Override - public void start() throws InterruptedException, IOException {} - - @Override - public void stop() throws InterruptedException, IOException {} -} diff --git a/core/server/common/src/main/java/alluxio/master/journal/ufs/UfsJournal.java b/core/server/common/src/main/java/alluxio/master/journal/ufs/UfsJournal.java index fbe6a7df8099..da9ae12bd764 100644 --- a/core/server/common/src/main/java/alluxio/master/journal/ufs/UfsJournal.java +++ b/core/server/common/src/main/java/alluxio/master/journal/ufs/UfsJournal.java @@ -14,9 +14,13 @@ import alluxio.Configuration; import alluxio.PropertyKey; import alluxio.exception.InvalidJournalEntryException; +import alluxio.master.journal.AsyncJournalWriter; import alluxio.master.journal.Journal; +import alluxio.master.journal.JournalContext; import alluxio.master.journal.JournalEntryStateMachine; import alluxio.master.journal.JournalReader; +import alluxio.master.journal.MasterJournalContext; +import alluxio.master.journal.NoopJournalContext; import alluxio.proto.journal.Journal.JournalEntry; import alluxio.underfs.UfsStatus; import alluxio.underfs.UnderFileSystem; @@ -83,6 +87,8 @@ public class UfsJournal implements Journal { private final long mQuietPeriodMs; /** The current log writer. Null when in secondary mode. */ private UfsJournalLogWriter mWriter; + /** Asynchronous journal writer. */ + private AsyncJournalWriter mAsyncWriter; /** * Thread for tailing the journal, taking snapshots, and applying updates to the state machine. * Null when in primary mode. @@ -137,16 +143,28 @@ public URI getLocation() { return mLocation; } - @Override + /** + * @param entry an entry to write to the journal + */ public void write(JournalEntry entry) throws IOException { writer().write(entry); } - @Override + /** + * Flushes the journal. + */ public void flush() throws IOException { writer().flush(); } + @Override + public JournalContext createJournalContext() { + if (mAsyncWriter == null) { + return new NoopJournalContext(); + } + return new MasterJournalContext(mAsyncWriter); + } + private UfsJournalLogWriter writer() throws IOException { if (mWriter == null) { throw new IllegalStateException("Cannot write to the journal in secondary mode"); @@ -176,6 +194,7 @@ public void gainPrimacy() throws IOException { mTailerThread = null; nextSequenceNumber = catchUp(nextSequenceNumber); mWriter = new UfsJournalLogWriter(this, nextSequenceNumber); + mAsyncWriter = new AsyncJournalWriter(mWriter); } /** @@ -187,6 +206,7 @@ public void losePrimacy() throws IOException { Preconditions.checkState(mTailerThread == null, "tailer thread must be null in primary mode"); mWriter.close(); mWriter = null; + mAsyncWriter = null; mMaster.resetState(); mTailerThread = new UfsJournalCheckpointThread(mMaster, this); mTailerThread.start(); @@ -327,6 +347,7 @@ public void close() throws IOException { if (mWriter != null) { mWriter.close(); mWriter = null; + mAsyncWriter = null; } if (mTailerThread != null) { mTailerThread.awaitTermination(false); diff --git a/core/server/common/src/main/java/alluxio/master/journal/ufs/UfsJournalLogWriter.java b/core/server/common/src/main/java/alluxio/master/journal/ufs/UfsJournalLogWriter.java index 2a99e6cf3ea7..923d4db43d02 100644 --- a/core/server/common/src/main/java/alluxio/master/journal/ufs/UfsJournalLogWriter.java +++ b/core/server/common/src/main/java/alluxio/master/journal/ufs/UfsJournalLogWriter.java @@ -15,6 +15,7 @@ import alluxio.PropertyKey; import alluxio.RuntimeConstants; import alluxio.exception.ExceptionMessage; +import alluxio.master.journal.JournalWriter; import alluxio.proto.journal.Journal.JournalEntry; import alluxio.underfs.UnderFileSystem; import alluxio.underfs.options.CreateOptions; @@ -42,7 +43,7 @@ * writer is closed. */ @ThreadSafe -final class UfsJournalLogWriter { +final class UfsJournalLogWriter implements JournalWriter { private static final Logger LOG = LoggerFactory.getLogger(UfsJournalLogWriter.class); private final UfsJournal mJournal; diff --git a/core/server/common/src/test/java/alluxio/master/journal/AsyncJournalWriterTest.java b/core/server/common/src/test/java/alluxio/master/journal/AsyncJournalWriterTest.java index 11cb050afb84..844956002020 100644 --- a/core/server/common/src/test/java/alluxio/master/journal/AsyncJournalWriterTest.java +++ b/core/server/common/src/test/java/alluxio/master/journal/AsyncJournalWriterTest.java @@ -35,7 +35,7 @@ */ public class AsyncJournalWriterTest { - private Journal mMockJournal; + private JournalWriter mMockJournalWriter; private AsyncJournalWriter mAsyncJournalWriter; @After @@ -50,10 +50,10 @@ private void setupAsyncJournalWriter(boolean batchingEnabled) throws Exception { Configuration.set(PropertyKey.MASTER_JOURNAL_FLUSH_BATCH_TIME_MS, "0ms"); } - mMockJournal = PowerMockito.mock(Journal.class); - doNothing().when(mMockJournal).write(any(JournalEntry.class)); - doNothing().when(mMockJournal).flush(); - mAsyncJournalWriter = new AsyncJournalWriter(mMockJournal); + mMockJournalWriter = PowerMockito.mock(JournalWriter.class); + doNothing().when(mMockJournalWriter).write(any(JournalEntry.class)); + doNothing().when(mMockJournalWriter).flush(); + mAsyncJournalWriter = new AsyncJournalWriter(mMockJournalWriter); } /** @@ -75,9 +75,9 @@ public void writesAndFlushesInternal(boolean batchingEnabled) throws Exception { mAsyncJournalWriter.flush(i); } if (batchingEnabled) { - Mockito.verify(mMockJournal, atLeastOnce()).flush(); + Mockito.verify(mMockJournalWriter, atLeastOnce()).flush(); } else { - Mockito.verify(mMockJournal, times(entries)).flush(); + Mockito.verify(mMockJournalWriter, times(entries)).flush(); } } @@ -107,7 +107,7 @@ public void failedWriteInternal(boolean batchingEnabled) throws Exception { } // Start failing journal writes. - doThrow(new IOException("entry write failed")).when(mMockJournal) + doThrow(new IOException("entry write failed")).when(mMockJournalWriter) .write(any(JournalEntry.class)); // Flushes should fail. @@ -121,16 +121,16 @@ public void failedWriteInternal(boolean batchingEnabled) throws Exception { } // Allow journal writes to succeed. - doNothing().when(mMockJournal).write(any(JournalEntry.class)); + doNothing().when(mMockJournalWriter).write(any(JournalEntry.class)); // Flushes should succeed. for (int i = 1; i <= entries; i++) { mAsyncJournalWriter.flush(i); } if (batchingEnabled) { - Mockito.verify(mMockJournal, atLeastOnce()).flush(); + Mockito.verify(mMockJournalWriter, atLeastOnce()).flush(); } else { - Mockito.verify(mMockJournal, times(entries)).flush(); + Mockito.verify(mMockJournalWriter, times(entries)).flush(); } } @@ -160,7 +160,7 @@ public void failedFlushInternal(boolean batchingEnabled) throws Exception { } // Start failing journal flushes. - doThrow(new IOException("flush failed")).when(mMockJournal).flush(); + doThrow(new IOException("flush failed")).when(mMockJournalWriter).flush(); // Flushes should fail. for (int i = 1; i <= entries; i++) { @@ -173,17 +173,17 @@ public void failedFlushInternal(boolean batchingEnabled) throws Exception { } // Allow journal flushes to succeed. - doNothing().when(mMockJournal).flush(); + doNothing().when(mMockJournalWriter).flush(); // Flushes should succeed. for (int i = 1; i <= entries; i++) { mAsyncJournalWriter.flush(i); } if (batchingEnabled) { - Mockito.verify(mMockJournal, atLeastOnce()).flush(); + Mockito.verify(mMockJournalWriter, atLeastOnce()).flush(); } else { // The first half of the calls were the failed flush calls. - Mockito.verify(mMockJournal, times(2 * entries)).flush(); + Mockito.verify(mMockJournalWriter, times(2 * entries)).flush(); } } diff --git a/core/server/master/src/main/java/alluxio/master/AlluxioMaster.java b/core/server/master/src/main/java/alluxio/master/AlluxioMaster.java index f7aa774607c5..ae53496a4555 100644 --- a/core/server/master/src/main/java/alluxio/master/AlluxioMaster.java +++ b/core/server/master/src/main/java/alluxio/master/AlluxioMaster.java @@ -38,7 +38,16 @@ public static void main(String[] args) { System.exit(-1); } - MasterProcess process = MasterProcess.Factory.create(); + MasterProcess process; + try { + process = MasterProcess.Factory.create(); + } catch (Throwable t) { + LOG.error("Failed to create master process", t); + // Exit to stop any non-daemon threads. + System.exit(-1); + throw t; + } + ProcessUtils.run(process); } diff --git a/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java b/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java index bf8ffeac20c4..dd4cefa1b5bf 100644 --- a/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java +++ b/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java @@ -356,7 +356,7 @@ public void removeBlocks(List blockIds, boolean delete) { if (mBlocks.remove(blockId) != null) { JournalEntry entry = JournalEntry.newBuilder() .setDeleteBlock(DeleteBlockEntry.newBuilder().setBlockId(blockId)).build(); - appendJournalEntry(entry, journalContext); + journalContext.append(entry); } } } @@ -401,7 +401,7 @@ public long getNewContainerId() { try (JournalContext journalContext = createJournalContext()) { // This must be flushed while holding the lock on mBlockContainerIdGenerator, in order to // prevent subsequent calls to return ids that have not been journaled and flushed. - appendJournalEntry(getContainerIdJournalEntry(), journalContext); + journalContext.append(getContainerIdJournalEntry()); } return containerId; } @@ -464,8 +464,7 @@ public void commitBlock(long workerId, long usedBytesOnTier, String tierAlias, l if (writeJournal) { BlockInfoEntry blockInfo = BlockInfoEntry.newBuilder().setBlockId(blockId).setLength(length).build(); - appendJournalEntry(JournalEntry.newBuilder().setBlockInfo(blockInfo).build(), - journalContext); + journalContext.append(JournalEntry.newBuilder().setBlockInfo(blockInfo).build()); } // At this point, both the worker and the block metadata are locked. @@ -503,8 +502,7 @@ public void commitBlockInUFS(long blockId, long length) { // Successfully added the new block metadata. Append a journal entry for the new metadata. BlockInfoEntry blockInfo = BlockInfoEntry.newBuilder().setBlockId(blockId).setLength(length).build(); - appendJournalEntry(JournalEntry.newBuilder().setBlockInfo(blockInfo).build(), - journalContext); + journalContext.append(JournalEntry.newBuilder().setBlockInfo(blockInfo).build()); } } } diff --git a/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java b/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java index 67ac42a6db1f..c7f9799740aa 100644 --- a/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java +++ b/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java @@ -143,7 +143,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -1042,8 +1041,7 @@ private void completeFileAndJournal(LockedInodePath inodePath, CompleteFileOptio CompleteFileEntry completeFileEntry = CompleteFileEntry.newBuilder().addAllBlockIds(fileInode.getBlockIds()).setId(inode.getId()) .setLength(length).setOpTimeMs(options.getOperationTimeMs()).build(); - appendJournalEntry(JournalEntry.newBuilder().setCompleteFile(completeFileEntry).build(), - journalContext); + journalContext.append(JournalEntry.newBuilder().setCompleteFile(completeFileEntry).build()); } /** @@ -1192,8 +1190,8 @@ public long reinitializeFile(AlluxioURI path, long blockSizeBytes, long ttl, Ttl ReinitializeFileEntry.newBuilder().setPath(path.getPath()) .setBlockSizeBytes(blockSizeBytes).setTtl(ttl) .setTtlAction(ProtobufUtils.toProtobuf(ttlAction)).build(); - appendJournalEntry( - JournalEntry.newBuilder().setReinitializeFile(reinitializeFile).build(), journalContext); + journalContext + .append(JournalEntry.newBuilder().setReinitializeFile(reinitializeFile).build()); return id; } } @@ -1927,7 +1925,7 @@ private void renameAndJournal(LockedInodePath srcInodePath, LockedInodePath dstI RenameEntry rename = RenameEntry.newBuilder().setId(srcInode.getId()).setDstPath(dstInodePath.getUri().getPath()) .setOpTimeMs(options.getOperationTimeMs()).build(); - appendJournalEntry(JournalEntry.newBuilder().setRename(rename).build(), journalContext); + journalContext.append(JournalEntry.newBuilder().setRename(rename).build()); } /** @@ -2139,8 +2137,8 @@ private void journalPersistedInodes(List> persistedInodes, for (Inode inode : persistedInodes) { PersistDirectoryEntry persistDirectory = PersistDirectoryEntry.newBuilder().setId(inode.getId()).build(); - appendJournalEntry(JournalEntry.newBuilder().setPersistDirectory(persistDirectory).build(), - journalContext); + journalContext + .append(JournalEntry.newBuilder().setPersistDirectory(persistDirectory).build()); } } @@ -2602,8 +2600,7 @@ private void mountAndJournal(LockedInodePath inodePath, AlluxioURI ufsPath, Moun .setUfsPath(ufsPath.toString()).setMountId(mountId) .setReadOnly(options.isReadOnly()) .addAllProperties(protoProperties).setShared(options.isShared()).build(); - appendJournalEntry(JournalEntry.newBuilder().setAddMountPoint(addMountPoint).build(), - journalContext); + journalContext.append(JournalEntry.newBuilder().setAddMountPoint(addMountPoint).build()); } /** @@ -2731,8 +2728,7 @@ private List> unmountAndJournal(LockedInodePath inodePath, JournalConte } DeleteMountPointEntry deleteMountPoint = DeleteMountPointEntry.newBuilder().setAlluxioPath(inodePath.getUri().toString()).build(); - appendJournalEntry(JournalEntry.newBuilder().setDeleteMountPoint(deleteMountPoint).build(), - journalContext); + journalContext.append(JournalEntry.newBuilder().setDeleteMountPoint(deleteMountPoint).build()); return deletedInodes; } @@ -2906,7 +2902,7 @@ private void journalSetAttribute(LockedInodePath inodePath, long opTimeMs, if (options.getMode() != Constants.INVALID_MODE) { builder.setPermission(options.getMode()); } - appendJournalEntry(JournalEntry.newBuilder().setSetAttribute(builder).build(), journalContext); + journalContext.append(JournalEntry.newBuilder().setSetAttribute(builder).build()); } @Override @@ -2934,9 +2930,9 @@ private void scheduleAsyncPersistenceAndJournal(LockedInodePath inodePath, // write to journal AsyncPersistRequestEntry asyncPersistRequestEntry = AsyncPersistRequestEntry.newBuilder().setFileId(fileId).build(); - appendJournalEntry( - JournalEntry.newBuilder().setAsyncPersistRequest(asyncPersistRequestEntry).build(), - journalContext); + journalContext + .append(JournalEntry.newBuilder().setAsyncPersistRequest(asyncPersistRequestEntry).build()); + } /** diff --git a/core/server/master/src/main/java/alluxio/master/lineage/DefaultLineageMaster.java b/core/server/master/src/main/java/alluxio/master/lineage/DefaultLineageMaster.java index 39fcf3b8fcd9..8c912565ef2b 100644 --- a/core/server/master/src/main/java/alluxio/master/lineage/DefaultLineageMaster.java +++ b/core/server/master/src/main/java/alluxio/master/lineage/DefaultLineageMaster.java @@ -33,6 +33,7 @@ import alluxio.master.AbstractMaster; import alluxio.master.file.FileSystemMaster; import alluxio.master.file.options.CreateFileOptions; +import alluxio.master.journal.JournalContext; import alluxio.master.journal.JournalSystem; import alluxio.master.lineage.checkpoint.CheckpointPlan; import alluxio.master.lineage.checkpoint.CheckpointSchedulingExecutor; @@ -203,22 +204,24 @@ public synchronized long createLineage(List inputFiles, List - diff --git a/minicluster/src/main/java/alluxio/master/LocalAlluxioMaster.java b/minicluster/src/main/java/alluxio/master/LocalAlluxioMaster.java index 2fc348fa4beb..bf9dfb4ebd83 100644 --- a/minicluster/src/main/java/alluxio/master/LocalAlluxioMaster.java +++ b/minicluster/src/main/java/alluxio/master/LocalAlluxioMaster.java @@ -113,7 +113,6 @@ public void run() { mMasterThread.setName("MasterThread-" + System.identityHashCode(mMasterThread)); mMasterThread.start(); mMasterProcess.waitForReady(); - mSecondaryMaster = new AlluxioSecondaryMaster(); Runnable runSecondaryMaster = new Runnable() { @Override @@ -148,8 +147,8 @@ public boolean isServing() { * Stops the master processes and cleans up client connections. */ public void stop() throws Exception { - mSecondaryMaster.stop(); if (mSecondaryMasterThread != null) { + mSecondaryMaster.stop(); while (mSecondaryMasterThread.isAlive()) { LOG.info("Stopping thread {}.", mSecondaryMasterThread.getName()); mSecondaryMasterThread.interrupt(); @@ -157,8 +156,8 @@ public void stop() throws Exception { } mSecondaryMasterThread = null; } - mMasterProcess.stop(); if (mMasterThread != null) { + mMasterProcess.stop(); while (mMasterThread.isAlive()) { LOG.info("Stopping thread {}.", mMasterThread.getName()); mMasterThread.interrupt(); diff --git a/pom.xml b/pom.xml index 17931253d3ab..aad608b664ba 100644 --- a/pom.xml +++ b/pom.xml @@ -688,7 +688,7 @@ powermock-module-junit4-rule ${powermock.version} test - + org.powermock powermock-reflect diff --git a/tests/src/test/java/alluxio/master/MockMaster.java b/tests/src/test/java/alluxio/master/MockMaster.java index 83ce38c336c6..f828186bcfbf 100644 --- a/tests/src/test/java/alluxio/master/MockMaster.java +++ b/tests/src/test/java/alluxio/master/MockMaster.java @@ -13,6 +13,7 @@ import alluxio.Server; import alluxio.proto.journal.Journal; +import alluxio.proto.journal.Journal.JournalEntry; import org.apache.thrift.TProcessor; @@ -22,13 +23,14 @@ import java.util.Map; import java.util.Queue; import java.util.Set; + import javax.annotation.Nullable; /** * A fake master implementation. */ public final class MockMaster implements Master { - private Queue mEntries; + private Queue mEntries; public MockMaster() { mEntries = new ArrayDeque<>();