diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieFileChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieFileChannel.java new file mode 100644 index 00000000000..776c35eaf87 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieFileChannel.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.bookie; + +import java.io.File; +import java.io.FileDescriptor; +import java.io.IOException; +import java.nio.channels.FileChannel; + +/** + * A FileChannel for the JournalChannel read and write, we can use this interface to extend the FileChannel + * which we use in the JournalChannel. + */ +interface BookieFileChannel { + + /** + * An interface for get the FileChannel from the provider. + * @return + */ + FileChannel getFileChannel(); + + /** + * Check the given file if exists. + * + * @param file + * @return + */ + boolean fileExists(File file); + + /** + * Get the file descriptor of the opened file. + * + * @return + * @throws IOException + */ + FileDescriptor getFD() throws IOException; +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultFileChannelProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultFileChannelProvider.java new file mode 100644 index 00000000000..f367042f0b0 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultFileChannelProvider.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.bookie; + +import org.apache.bookkeeper.conf.ServerConfiguration; + +import java.io.File; +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; + +public class DefaultFileChannelProvider implements FileChannelProvider{ + @Override + public BookieFileChannel open(File file, ServerConfiguration configuration) throws IOException { + return new DefaultFileChannel(file, configuration); + } + + static class DefaultFileChannel implements BookieFileChannel { + private final File file; + private final RandomAccessFile randomAccessFile; + private final ServerConfiguration configuration; + + DefaultFileChannel(File file, ServerConfiguration serverConfiguration) throws IOException { + this.file = file; + this.randomAccessFile = new RandomAccessFile(file, "rw"); + this.configuration = serverConfiguration; + } + + @Override + public FileChannel getFileChannel() { + return randomAccessFile.getChannel(); + } + + @Override + public boolean fileExists(File file) { + return file.exists(); + } + + @Override + public FileDescriptor getFD() throws IOException { + return randomAccessFile.getFD(); + } + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileChannelProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileChannelProvider.java new file mode 100644 index 00000000000..bcf9722e510 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileChannelProvider.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.bookie; + +import org.apache.bookkeeper.conf.ServerConfiguration; + +import java.io.File; +import java.io.IOException; + +/** + * An interface of the FileChannelProvider. + */ +public interface FileChannelProvider { + static FileChannelProvider newProvider(String providerClassName) throws IOException { + try { + Class providerClass = Class.forName(providerClassName); + Object obj = providerClass.newInstance(); + return (FileChannelProvider) obj; + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * Get the BookieFileChannel with the given file and configuration. + * + * @param file + * @param configuration + * @return + * @throws IOException + */ + BookieFileChannel open(File file, ServerConfiguration configuration) throws IOException; +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 1d35b3296f6..4afdc553705 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -782,10 +782,10 @@ public long scanJournal(long journalId, long journalPos, JournalScanner scanner) throws IOException { JournalChannel recLog; if (journalPos <= 0) { - recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize); + recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, conf); } else { recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, - journalPos); + journalPos, conf); } int journalVersion = recLog.getFormatVersion(); try { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java index 5434d168d4f..04d789083ba 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java @@ -33,6 +33,7 @@ import java.nio.channels.FileChannel; import java.util.Arrays; +import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.util.NativeIO; import org.apache.bookkeeper.util.ZeroBuffer; import org.slf4j.Logger; @@ -45,7 +46,7 @@ class JournalChannel implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(JournalChannel.class); - final RandomAccessFile randomAccessFile; + final BookieFileChannel channel; final int fd; final FileChannel fc; final BufferedChannel bc; @@ -85,21 +86,23 @@ class JournalChannel implements Closeable { // The position of the file channel's last drop position private long lastDropPosition = 0L; + final ServerConfiguration configuration; + // Mostly used by tests JournalChannel(File journalDirectory, long logId) throws IOException { - this(journalDirectory, logId, 4 * 1024 * 1024, 65536, START_OF_FILE); + this(journalDirectory, logId, 4 * 1024 * 1024, 65536, START_OF_FILE, new ServerConfiguration()); } // Open journal for scanning starting from the first record in journal. - JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize) throws IOException { - this(journalDirectory, logId, preAllocSize, writeBufferSize, START_OF_FILE); + JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize, ServerConfiguration conf) throws IOException { + this(journalDirectory, logId, preAllocSize, writeBufferSize, START_OF_FILE, conf); } // Open journal for scanning starting from given position. JournalChannel(File journalDirectory, long logId, - long preAllocSize, int writeBufferSize, long position) throws IOException { + long preAllocSize, int writeBufferSize, long position, ServerConfiguration conf) throws IOException { this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE, - position, false, V5, Journal.BufferedChannelBuilder.DEFAULT_BCBUILDER); + position, false, V5, Journal.BufferedChannelBuilder.DEFAULT_BCBUILDER, conf); } // Open journal to write @@ -115,7 +118,7 @@ class JournalChannel implements Closeable { boolean fRemoveFromPageCache, int formatVersionToWrite, Journal.BufferedChannelBuilder bcBuilder) throws IOException { this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize, - START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite, bcBuilder); + START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite, bcBuilder, new ServerConfiguration()); } /** @@ -143,27 +146,39 @@ class JournalChannel implements Closeable { private JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize, int journalAlignSize, long position, boolean fRemoveFromPageCache, - int formatVersionToWrite, Journal.BufferedChannelBuilder bcBuilder) throws IOException { + int formatVersionToWrite, Journal.BufferedChannelBuilder bcBuilder, + ServerConfiguration configuration) throws IOException { this.journalAlignSize = journalAlignSize; this.zeros = ByteBuffer.allocate(journalAlignSize); this.preAllocSize = preAllocSize - preAllocSize % journalAlignSize; this.fRemoveFromPageCache = fRemoveFromPageCache; + this.configuration = configuration; + File fn = new File(journalDirectory, Long.toHexString(logId) + ".txn"); + FileChannelProvider provider; + // Create the file channel provider with given configuration. Fallback to use default FileChannel if + // load failed. + try { + provider = FileChannelProvider.newProvider(configuration.getJournalChannelProvider()); + } catch (IOException e) { + LOG.warn("Failed to load journal channel provider '{}', fallback to the default JournalChannel", configuration.getJournalChannelProvider(), e); + provider = new DefaultFileChannelProvider(); + } + channel = provider.open(fn, configuration); if (formatVersionToWrite < V4) { throw new IOException("Invalid journal format to write : version = " + formatVersionToWrite); } LOG.info("Opening journal {}", fn); - if (!fn.exists()) { // new file, write version + if (!channel.fileExists(fn)) { // new file, write version if (!fn.createNewFile()) { LOG.error("Journal file {}, that shouldn't exist, already exists. " + " is there another bookie process running?", fn); throw new IOException("File " + fn + " suddenly appeared, is another bookie process running?"); } - randomAccessFile = new RandomAccessFile(fn, "rw"); - fc = openFileChannel(randomAccessFile); + fc = channel.getFileChannel(); formatVersion = formatVersionToWrite; int headerSize = (V4 == formatVersion) ? VERSION_HEADER_SIZE : HEADER_SIZE; @@ -180,8 +195,7 @@ private JournalChannel(File journalDirectory, long logId, nextPrealloc = this.preAllocSize; fc.write(zeros, nextPrealloc - journalAlignSize); } else { // open an existing file - randomAccessFile = new RandomAccessFile(fn, "r"); - fc = openFileChannel(randomAccessFile); + fc = channel.getFileChannel(); bc = null; // readonly ByteBuffer bb = ByteBuffer.allocate(VERSION_HEADER_SIZE); @@ -231,7 +245,7 @@ private JournalChannel(File journalDirectory, long logId, } } if (fRemoveFromPageCache) { - this.fd = NativeIO.getSysFileDescriptor(randomAccessFile.getFD()); + this.fd = NativeIO.getSysFileDescriptor(channel.getFD()); } else { this.fd = -1; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 00ef0281ff1..89501dee456 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -146,6 +146,7 @@ public class ServerConfiguration extends AbstractConfiguration