Skip to content

Set the default buffer size (if not specified) to 128k #66

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions src/main/java/org/apache/hadoop/fs/glusterfs/GlusterVolume.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,16 @@

public class GlusterVolume extends RawLocalFileSystem{

static final Logger log = LoggerFactory.getLogger(GlusterVolume.class);

static final Logger log = LoggerFactory.getLogger(GlusterFileSystemCRC.class);
/**
* General reason for these constants is to help us decide
* when to override the specified buffer size. See implementation
* of logic below, which might change overtime.
*/
public static final int OVERRIDE_WRITE_BUFFER_SIZE = 1024 * 4;
public static final int OPTIMAL_WRITE_BUFFER_SIZE = 1024 * 128;

public static final URI NAME = URI.create("glusterfs:///");

protected String root=null;
Expand Down Expand Up @@ -84,18 +92,25 @@ public void setConf(Configuration conf){
mkdirs(mapredSysDirectory);
}

/**
* Ensure the initial working directory exists
**/
superUser = conf.get("gluster.daemon.user", null);

aclFilter = new AclPathFilter(conf);

/* ensure the initial working directory exists */
Path workingDirectory = getInitialWorkingDirectory();
mkdirs(workingDirectory);

//volName=conf.get("fs.glusterfs.volname", null);
//remoteGFSServer=conf.get("fs.glusterfs.server", null);

}catch (Exception e){

/**
* Write Buffering
*/
Integer userBufferSize=conf.getInt("io.file.buffer.size", -1);
if(userBufferSize == OVERRIDE_WRITE_BUFFER_SIZE || userBufferSize == -1) {
conf.setInt("io.file.buffer.size", OPTIMAL_WRITE_BUFFER_SIZE);
}
log.info("Write buffer size : " +conf.getInt("io.file.buffer.size",-1)) ;
}
catch (Exception e){
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package org.apache.hadoop.fs.test.connector.glusterfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.glusterfs.GlusterVolume;
import org.apache.hadoop.fs.test.connector.HcfsTestConnector;

/**
* A HCFS test connector specifically for instantiation and testing Glusterfs.
*/
public class GlusterFileSystemTestConnector extends HcfsTestConnector{
public Configuration createConfiguration(){

public Configuration createConfiguration(){
Configuration c = super.createConfiguration();
c.set("fs.glusterfs.mount",System.getProperty("GLUSTER_MOUNT"));
c.set("fs.glusterfs.impl","org.apache.hadoop.fs.local.GlusterFs");
c.set("fs.default.name","glusterfs:///");
c.setInt("io.file.buffer.size",GlusterVolume.OVERRIDE_WRITE_BUFFER_SIZE );
return c;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package org.apache.hadoop.fs.test.unit;

import static org.apache.hadoop.fs.FileSystemTestHelper.getTestRootPath;

import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.glusterfs.GlusterVolume;
import org.apache.hadoop.fs.test.connector.HcfsTestConnectorFactory;
import org.apache.hadoop.fs.test.connector.HcfsTestConnectorInterface;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A class for performance and IO related unit tests.
*
* - Write buffering
* - Read buffering
* - Object caching / File lookup caching.
* - Seeking
*/
public class HCFSPerformanceIOTests {

static FileSystem fs ;
Logger log = LoggerFactory.getLogger(HCFSPerformanceIOTests.class);

@BeforeClass
public static void setup() throws Exception {
HcfsTestConnectorInterface connector = HcfsTestConnectorFactory.getHcfsTestConnector();
fs= connector.create();
}

@AfterClass
public static void after() throws IOException{
fs.close();
}

public Path bufferoutpath(){
return getTestRootPath(fs, "buffering_test"+HCFSPerformanceIOTests.class.getName());
}

@After
public void tearDown() throws Exception {
fs.delete(bufferoutpath(),true);
}

//String to append to file we are writing.
static final String CONTENT="1234";

/**
* This is a complex test. It documents the expected behaviour of the
* FileSystem buffering.
*
* It assumes that the configuration value of FS is == the {@link GlusterVolume} OVERRIDE_WRITE_BUFFER_SIZE.
* Then, it starts writing to a stream.
*/
@Test
public void testBufferSpill() throws Exception {

/**
* Sanity check: This test expects that an override is being performed, i.e., that
* the buffering is going to be set to the optimal size, because the file system
* detected that the configured original buffer size was == to the "bad default" value which
* we have decide to override, for the sack of "reasonable defaults" out of the box.
*/
Assert.assertEquals(
GlusterVolume.OPTIMAL_WRITE_BUFFER_SIZE,
fs.getConf().getInt("io.file.buffer.size",-1));

FSDataOutputStream os = fs.create(bufferoutpath());

int written=0;

/**
* Now, we assert that no data is spilled to disk until we reach the optimal size.
*/
while(written < GlusterVolume.OPTIMAL_WRITE_BUFFER_SIZE){
os.write(CONTENT.getBytes());
written+=CONTENT.getBytes().length;
Assert.assertTrue("asserting that file not written yet...",fs.getLength(bufferoutpath())==0);
}
os.flush();

Assert.assertTrue("asserting that is now written... ",fs.getLength(bufferoutpath()) >= GlusterVolume.OPTIMAL_WRITE_BUFFER_SIZE);

os.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

package org.apache.hadoop.fs.test.unit;

import static org.apache.hadoop.fs.FileSystemTestHelper.getTestRootPath;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand All @@ -40,6 +41,7 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.test.connector.HcfsTestConnectorFactory;
import org.apache.hadoop.fs.test.connector.HcfsTestConnectorInterface;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand All @@ -62,9 +64,36 @@ public static void setup() throws Exception {
@AfterClass
public static void after() throws IOException{
fs.close();

}

@After
public void tearDown() throws Exception {
fs.delete(getTestRootPath(fs, "test"),true);
}

@Test
public void testBufferSpill() throws Exception {
Path out = new Path("a");

FSDataOutputStream os = fs.create(out);

int written=0;
/**
* Assert that writes smaller than 10KB are NOT spilled to disk
*/
while(written<10000){
os.write("ASDF".getBytes());
written+="ASDF".getBytes().length;
//now, we expect
Assert.assertTrue("asserting that file not written yet",fs.getLength(out)==0);
}
os.flush();
Assert.assertTrue("asserting that file not written yet",fs.getLength(out)>=10000);

os.close();
fs.delete(out);
}

@org.junit.Test
public void testTolerantMkdirs() throws Exception{
Path longPath=new Path("a/b/c/d");
Expand Down