Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

[WIP] Allow plug the FileChannel in the Journal #381

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.bookie;

import java.io.File;
import java.io.FileDescriptor;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.FileChannel;

/**
* A FileChannel for the JournalChannel read and write, we can use this interface to extend the FileChannel
* which we use in the JournalChannel.
*/
interface BookieFileChannel {

/**
* An interface for get the FileChannel from the provider.
* @return
*/
FileChannel getFileChannel() throws FileNotFoundException, IOException;

/**
* Check the given file if exists.
*
* @param file
* @return
*/
boolean fileExists(File file);

/**
* Get the file descriptor of the opened file.
*
* @return
* @throws IOException
*/
FileDescriptor getFD() throws IOException;

/**
* Close file channel and release all resources.
*/
void close() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.bookkeeper.bookie;

import java.io.File;
import java.io.FileDescriptor;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import org.apache.bookkeeper.conf.ServerConfiguration;

class DefaultFileChannel implements BookieFileChannel {
private final File file;
private RandomAccessFile randomAccessFile;
private final ServerConfiguration configuration;

DefaultFileChannel(File file, ServerConfiguration serverConfiguration) throws IOException {
this.file = file;
this.configuration = serverConfiguration;
}

@Override
public FileChannel getFileChannel() throws FileNotFoundException {
synchronized (this) {
if (randomAccessFile == null) {
randomAccessFile = new RandomAccessFile(file, "rw");
}
return randomAccessFile.getChannel();
}
}

@Override
public boolean fileExists(File file) {
return file.exists();
}

@Override
public FileDescriptor getFD() throws IOException {
synchronized (this) {
if (randomAccessFile == null) {
throw new IOException("randomAccessFile is null, please initialize it by calling getFileChannel");
}
return randomAccessFile.getFD();
}
}

@Override
public void close() throws IOException {
synchronized (this) {
if (randomAccessFile != null) {
randomAccessFile.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.bookkeeper.bookie;

import java.io.File;
import java.io.IOException;
import org.apache.bookkeeper.conf.ServerConfiguration;

/**
* A wrapper of FileChannel.
*/
public class DefaultFileChannelProvider implements FileChannelProvider{
@Override
public BookieFileChannel open(File file, ServerConfiguration configuration) throws IOException {
return new DefaultFileChannel(file, configuration);
}

@Override
public void close(BookieFileChannel bookieFileChannel) throws IOException {
bookieFileChannel.close();
}

@Override
public void close() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.bookkeeper.bookie;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import org.apache.bookkeeper.conf.ServerConfiguration;

/**
* An interface of the FileChannelProvider.
*/
public interface FileChannelProvider extends Closeable {
/**
*
* @param providerClassName Provided class name for file channel.
* @return FileChannelProvider. A file channel provider loaded from providerClassName
* @throws IOException Possible IOException.
*/
static FileChannelProvider newProvider(String providerClassName) throws IOException {
try {
Class<?> providerClass = Class.forName(providerClassName);
Object obj = providerClass.getConstructor().newInstance();
return (FileChannelProvider) obj;
} catch (Exception e) {
throw new IOException(e);
}
}

/**
* Get the BookieFileChannel with the given file and configuration.
*
* @param file File path related to bookie.
* @param configuration Server configuration.
* @return BookieFileChannel related to file parameter.
* @throws IOException Possible IOException.
*/
BookieFileChannel open(File file, ServerConfiguration configuration) throws IOException;

/**
* Close bookieFileChannel.
* @param bookieFileChannel The bookieFileChannel to be closed.
* @throws IOException Possible IOException.
*/
void close(BookieFileChannel bookieFileChannel) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -782,10 +782,10 @@ public long scanJournal(long journalId, long journalPos, JournalScanner scanner)
throws IOException {
JournalChannel recLog;
if (journalPos <= 0) {
recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize);
recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, conf);
} else {
recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize,
journalPos);
journalPos, conf);
}
int journalVersion = recLog.getFormatVersion();
try {
Expand Down Expand Up @@ -960,7 +960,7 @@ public void run() {
journalCreationWatcher.reset().start();
logFile = new JournalChannel(journalDirectory, logId, journalPreAllocSize, journalWriteBufferSize,
journalAlignmentSize, removePagesFromCache,
journalFormatVersionToWrite, getBufferedChannelBuilder());
journalFormatVersionToWrite, getBufferedChannelBuilder(), conf);

journalStats.getJournalCreationStats().registerSuccessfulEvent(
journalCreationWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
Expand Down
Loading