Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

Commit

Permalink
Abstract the FileChanne in the JournalChannel
Browse files Browse the repository at this point in the history
**Motivation**

Make the FileChannel in the JournalChannel can use different implement.
We found we can use [pmemstore](https://github.com/4paradigm/pmemstore)
as the JournalChannel read from. So abstract the FileChannel in the
JournnalChannel to make us can have different implementation.
  • Loading branch information
zymap committed Jun 18, 2021
1 parent 16e8ba7 commit d552434
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.bookie;

import java.io.File;
import java.io.FileDescriptor;
import java.io.IOException;
import java.nio.channels.FileChannel;

/**
* A FileChannel for the JournalChannel read and write, we can use this interface to extend the FileChannel
* which we use in the JournalChannel.
*/
interface BookieFileChannel {

/**
* An interface for get the FileChannel from the provider.
* @return
*/
FileChannel getFileChannel();

/**
* Check the given file if exists.
*
* @param file
* @return
*/
boolean fileExists(File file);

/**
* Get the file descriptor of the opened file.
*
* @return
* @throws IOException
*/
FileDescriptor getFD() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.bookie;

import org.apache.bookkeeper.conf.ServerConfiguration;

import java.io.File;
import java.io.FileDescriptor;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;

public class DefaultFileChannelProvider implements FileChannelProvider{
@Override
public BookieFileChannel open(File file, ServerConfiguration configuration) throws IOException {
return new DefaultFileChannel(file, configuration);
}

static class DefaultFileChannel implements BookieFileChannel {
private final File file;
private final RandomAccessFile randomAccessFile;
private final ServerConfiguration configuration;

DefaultFileChannel(File file, ServerConfiguration serverConfiguration) throws IOException {
this.file = file;
this.randomAccessFile = new RandomAccessFile(file, "rw");
this.configuration = serverConfiguration;
}

@Override
public FileChannel getFileChannel() {
return randomAccessFile.getChannel();
}

@Override
public boolean fileExists(File file) {
return file.exists();
}

@Override
public FileDescriptor getFD() throws IOException {
return randomAccessFile.getFD();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.bookie;

import org.apache.bookkeeper.conf.ServerConfiguration;

import java.io.File;
import java.io.IOException;

/**
* An interface of the FileChannelProvider.
*/
public interface FileChannelProvider {
static FileChannelProvider newProvider(String providerClassName) throws IOException {
try {
Class<?> providerClass = Class.forName(providerClassName);
Object obj = providerClass.newInstance();
return (FileChannelProvider) obj;
} catch (Exception e) {
throw new IOException(e);
}
}

/**
* Get the BookieFileChannel with the given file and configuration.
*
* @param file
* @param configuration
* @return
* @throws IOException
*/
BookieFileChannel open(File file, ServerConfiguration configuration) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -782,10 +782,10 @@ public long scanJournal(long journalId, long journalPos, JournalScanner scanner)
throws IOException {
JournalChannel recLog;
if (journalPos <= 0) {
recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize);
recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, conf);
} else {
recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize,
journalPos);
journalPos, conf);
}
int journalVersion = recLog.getFormatVersion();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.nio.channels.FileChannel;
import java.util.Arrays;

import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.util.NativeIO;
import org.apache.bookkeeper.util.ZeroBuffer;
import org.slf4j.Logger;
Expand All @@ -45,7 +46,7 @@
class JournalChannel implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(JournalChannel.class);

final RandomAccessFile randomAccessFile;
final BookieFileChannel channel;
final int fd;
final FileChannel fc;
final BufferedChannel bc;
Expand Down Expand Up @@ -85,21 +86,23 @@ class JournalChannel implements Closeable {
// The position of the file channel's last drop position
private long lastDropPosition = 0L;

final ServerConfiguration configuration;

// Mostly used by tests
JournalChannel(File journalDirectory, long logId) throws IOException {
this(journalDirectory, logId, 4 * 1024 * 1024, 65536, START_OF_FILE);
this(journalDirectory, logId, 4 * 1024 * 1024, 65536, START_OF_FILE, new ServerConfiguration());
}

// Open journal for scanning starting from the first record in journal.
JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize) throws IOException {
this(journalDirectory, logId, preAllocSize, writeBufferSize, START_OF_FILE);
JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize, ServerConfiguration conf) throws IOException {
this(journalDirectory, logId, preAllocSize, writeBufferSize, START_OF_FILE, conf);
}

// Open journal for scanning starting from given position.
JournalChannel(File journalDirectory, long logId,
long preAllocSize, int writeBufferSize, long position) throws IOException {
long preAllocSize, int writeBufferSize, long position, ServerConfiguration conf) throws IOException {
this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE,
position, false, V5, Journal.BufferedChannelBuilder.DEFAULT_BCBUILDER);
position, false, V5, Journal.BufferedChannelBuilder.DEFAULT_BCBUILDER, conf);
}

// Open journal to write
Expand All @@ -115,7 +118,7 @@ class JournalChannel implements Closeable {
boolean fRemoveFromPageCache, int formatVersionToWrite,
Journal.BufferedChannelBuilder bcBuilder) throws IOException {
this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize,
START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite, bcBuilder);
START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite, bcBuilder, new ServerConfiguration());
}

/**
Expand Down Expand Up @@ -143,27 +146,39 @@ class JournalChannel implements Closeable {
private JournalChannel(File journalDirectory, long logId,
long preAllocSize, int writeBufferSize, int journalAlignSize,
long position, boolean fRemoveFromPageCache,
int formatVersionToWrite, Journal.BufferedChannelBuilder bcBuilder) throws IOException {
int formatVersionToWrite, Journal.BufferedChannelBuilder bcBuilder,
ServerConfiguration configuration) throws IOException {
this.journalAlignSize = journalAlignSize;
this.zeros = ByteBuffer.allocate(journalAlignSize);
this.preAllocSize = preAllocSize - preAllocSize % journalAlignSize;
this.fRemoveFromPageCache = fRemoveFromPageCache;
this.configuration = configuration;

File fn = new File(journalDirectory, Long.toHexString(logId) + ".txn");
FileChannelProvider provider;
// Create the file channel provider with given configuration. Fallback to use default FileChannel if
// load failed.
try {
provider = FileChannelProvider.newProvider(configuration.getJournalChannelProvider());
} catch (IOException e) {
LOG.warn("Failed to load journal channel provider '{}', fallback to the default JournalChannel", configuration.getJournalChannelProvider(), e);
provider = new DefaultFileChannelProvider();
}
channel = provider.open(fn, configuration);

if (formatVersionToWrite < V4) {
throw new IOException("Invalid journal format to write : version = " + formatVersionToWrite);
}

LOG.info("Opening journal {}", fn);
if (!fn.exists()) { // new file, write version
if (!channel.fileExists(fn)) { // new file, write version
if (!fn.createNewFile()) {
LOG.error("Journal file {}, that shouldn't exist, already exists. "
+ " is there another bookie process running?", fn);
throw new IOException("File " + fn
+ " suddenly appeared, is another bookie process running?");
}
randomAccessFile = new RandomAccessFile(fn, "rw");
fc = openFileChannel(randomAccessFile);
fc = channel.getFileChannel();
formatVersion = formatVersionToWrite;

int headerSize = (V4 == formatVersion) ? VERSION_HEADER_SIZE : HEADER_SIZE;
Expand All @@ -180,8 +195,7 @@ private JournalChannel(File journalDirectory, long logId,
nextPrealloc = this.preAllocSize;
fc.write(zeros, nextPrealloc - journalAlignSize);
} else { // open an existing file
randomAccessFile = new RandomAccessFile(fn, "r");
fc = openFileChannel(randomAccessFile);
fc = channel.getFileChannel();
bc = null; // readonly

ByteBuffer bb = ByteBuffer.allocate(VERSION_HEADER_SIZE);
Expand Down Expand Up @@ -231,7 +245,7 @@ private JournalChannel(File journalDirectory, long logId,
}
}
if (fRemoveFromPageCache) {
this.fd = NativeIO.getSysFileDescriptor(randomAccessFile.getFD());
this.fd = NativeIO.getSysFileDescriptor(channel.getFD());
} else {
this.fd = -1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
protected static final String JOURNAL_QUEUE_SIZE = "journalQueueSize";
protected static final String JOURNAL_MAX_MEMORY_SIZE_MB = "journalMaxMemorySizeMb";
protected static final String JOURNAL_PAGECACHE_FLUSH_INTERVAL_MSEC = "journalPageCacheFlushIntervalMSec";
protected static final String JOURNAL_CHANNEL_PROVIDER = "journalChannelProvider";
// backpressure control
protected static final String MAX_ADDS_IN_PROGRESS_LIMIT = "maxAddsInProgressLimit";
protected static final String MAX_READS_IN_PROGRESS_LIMIT = "maxReadsInProgressLimit";
Expand Down Expand Up @@ -868,6 +869,15 @@ public long getJournalPageCacheFlushIntervalMSec() {
return this.getLong(JOURNAL_PAGECACHE_FLUSH_INTERVAL_MSEC, 1000);
}

public ServerConfiguration setJournalChannelProvider(String journalChannelProvider) {
this.setProperty(JOURNAL_CHANNEL_PROVIDER, journalChannelProvider);
return this;
}

public String getJournalChannelProvider() {
return this.getString(JOURNAL_CHANNEL_PROVIDER);
}

/**
* Get max number of adds in progress. 0 == unlimited.
*
Expand Down

0 comments on commit d552434

Please sign in to comment.