Skip to content

Commit

Permalink
Support open with O_RDWR flag
Browse files Browse the repository at this point in the history
Currently, Mountpoint supports either open with O_WRONLY or O_RDONLY
because we don't allow applications to do both read and write at the same
time. However, it's possible support O_RDWR flag too since we can decide
at open time whether to give a read handle or a write handle back, and
for any inode it's never possible for both start_reading and start_writing
to work.

Signed-off-by: Monthon Klongklaew <monthonk@amazon.com>
  • Loading branch information
monthonk committed Jul 13, 2023
1 parent b6493af commit 8fb1917
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 44 deletions.
91 changes: 56 additions & 35 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,54 @@ enum FileHandleType<Client: ObjectClient, Runtime> {
},
}

impl<Client: ObjectClient, Runtime> FileHandleType<Client, Runtime> {
async fn new_write_handle(
lookup: &LookedUp,
ino: InodeNo,
flags: i32,
fs: &S3Filesystem<Client, Runtime>,
) -> Result<FileHandleType<Client, Runtime>, libc::c_int> {
// We can't support O_SYNC writes because they require the data to go to stable storage
// at `write` time, but we only commit a PUT at `close` time.
if flags & (libc::O_SYNC | libc::O_DSYNC) != 0 {
error!("O_SYNC and O_DSYNC are unsupported");
return Err(libc::EINVAL);
}

let handle = match fs.superblock.write(&fs.client, ino, lookup.inode.parent()).await {
Ok(handle) => handle,
Err(e) => {
error!("open failed: {e:?}");
return Err(e.into());
}
};
let key = lookup.inode.full_key();
let handle = match fs.uploader.put(&fs.bucket, key).await {
Err(e) => {
error!(key, "put failed to start: {e:?}");
return Err(libc::EIO);
}
Ok(request) => FileHandleType::Write {
request: UploadState::InProgress(request).into(),
handle,
},
};
Ok(handle)
}

async fn new_read_handle(lookup: &LookedUp) -> Result<FileHandleType<Client, Runtime>, libc::c_int> {
lookup.inode.start_reading()?;
let handle = FileHandleType::Read {
request: Default::default(),
etag: match &lookup.stat.etag {
None => return Err(libc::EBADF),
Some(etag) => ETag::from_str(etag).expect("E-Tag should be set"),
},
};
Ok(handle)
}
}

#[derive(Debug)]
enum UploadState<Client: ObjectClient> {
InProgress(UploadRequest<Client>),
Expand Down Expand Up @@ -375,43 +423,16 @@ where
}

let handle_type = if flags & libc::O_RDWR != 0 {
error!("O_RDWR is unsupported");
return Err(libc::EINVAL);
} else if flags & libc::O_WRONLY != 0 {
// We can't support O_SYNC writes because they require the data to go to stable storage
// at `write` time, but we only commit a PUT at `close` time.
if flags & (libc::O_SYNC | libc::O_DSYNC) != 0 {
error!("O_SYNC and O_DSYNC are unsupported");
return Err(libc::EINVAL);
}

let handle = match self.superblock.write(&self.client, ino, lookup.inode.parent()).await {
Ok(handle) => handle,
Err(e) => {
error!("open failed: {e:?}");
return Err(e.into());
}
};
let key = lookup.inode.full_key();
match self.uploader.put(&self.bucket, key).await {
Err(e) => {
error!(key, "put failed to start: {e:?}");
return Err(libc::EIO);
}
Ok(request) => FileHandleType::Write {
request: UploadState::InProgress(request).into(),
handle,
},
let remote_file = lookup.inode.is_remote()?;
if remote_file {
FileHandleType::new_read_handle(&lookup).await?
} else {
FileHandleType::new_write_handle(&lookup, ino, flags, self).await?
}
} else if flags & libc::O_WRONLY != 0 {
FileHandleType::new_write_handle(&lookup, ino, flags, self).await?
} else {
lookup.inode.start_reading()?;
FileHandleType::Read {
request: Default::default(),
etag: match lookup.stat.etag {
None => return Err(libc::EBADF),
Some(etag) => ETag::from_str(&etag).expect("E-Tag should be set"),
},
}
FileHandleType::new_read_handle(&lookup).await?
};

let full_key = lookup.inode.full_key().to_owned();
Expand Down
5 changes: 5 additions & 0 deletions mountpoint-s3/src/inode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,11 @@ impl Inode {
*lookup_count
}

pub fn is_remote(&self) -> Result<bool, InodeError> {
let state = self.get_inode_state()?;
Ok(state.write_status == WriteStatus::Remote)
}

pub fn start_reading(&self) -> Result<(), InodeError> {
let state = self.get_inode_state()?;
match state.write_status {
Expand Down
69 changes: 60 additions & 9 deletions mountpoint-s3/tests/fuse_tests/write_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,57 @@ where
// We shouldn't be allowed to open the file for writing again
let err = open_for_write(&path, append).expect_err("can't write existing file");
assert_eq!(err.kind(), ErrorKind::PermissionDenied);

// Also test writing with O_RDWR flag
let path = mount_point.path().join("dir/new_rdwr.txt");

let mut f = File::options().read(true).write(true).create(true).open(&path).unwrap();

// The file is visible with size 0 as soon as we open it for write
let m = metadata(&path).unwrap();
assert_eq!(m.len(), 0);

// verify the new file is visible in readdir
let read_dir_iter = read_dir(&subdir).unwrap();
let dir_entry_names = read_dir_to_entry_names(read_dir_iter);
assert_eq!(dir_entry_names, vec!["hello.txt", "new.txt", "new_rdwr.txt"]);

let mut rng = ChaCha20Rng::seed_from_u64(0x12345678 + OBJECT_SIZE as u64);
let mut body = vec![0u8; OBJECT_SIZE];
rng.fill(&mut body[..]);

for part in body.chunks(WRITE_SIZE) {
f.write_all(part).unwrap();
}

// We shouldn't be able to read from a file mid-write
let err = f.read(&mut [0u8; 1]).expect_err("can't read file while writing");
assert_eq!(err.raw_os_error(), Some(libc::EBADF));

drop(f);

// The kernel doesn't guarantee to flush the data as soon as the file is closed. Currently,
// the file won't be visible on the file system until it's flushed to S3, and so trying to stat
// the file will fail.
// TODO we can remove this when we implement fsync, or change it when we make files visible
// during writes
std::thread::sleep(Duration::from_secs(5));

// Now it's closed, we can stat or read it
let m = metadata(&path).unwrap();
assert_eq!(m.len(), body.len() as u64);

let buf = read(&path).unwrap();
assert_eq!(&buf[..], &body[..]);

// Readdir should still work correctly
let read_dir_iter = read_dir(&subdir).unwrap();
let dir_entry_names = read_dir_to_entry_names(read_dir_iter);
assert_eq!(dir_entry_names, vec!["hello.txt", "new.txt", "new_rdwr.txt"]);

// We shouldn't be allowed to open the file for writing again
let err = open_for_write(&path, append).expect_err("can't write existing file");
assert_eq!(err.kind(), ErrorKind::PermissionDenied);
}

#[cfg(feature = "s3_tests")]
Expand Down Expand Up @@ -117,15 +168,6 @@ where
let err = open_for_write(&path, true).expect_err("can't write existing file");
assert_eq!(err.kind(), ErrorKind::PermissionDenied);

// New files can't be opened in O_RDWR
let err = File::options()
.read(true)
.write(true)
.create(true)
.open(&path)
.expect_err("O_RDWR should fail");
assert_eq!(err.kind(), ErrorKind::InvalidInput);

// New files can't be opened with O_SYNC
let err = File::options()
.write(true)
Expand All @@ -141,6 +183,15 @@ where
.write(b"hello world")
.expect_err("writing to O_RDONLY file should fail");
assert_eq!(err.raw_os_error(), Some(libc::EBADF));

// We can't write to an existing file even it's opened in O_RDWR
let mut file = File::options().read(true).write(true).create(true).open(&path).unwrap();
let err = file
.write(b"hello world")
.expect_err("write to an existing file should fail");
assert_eq!(err.raw_os_error(), Some(libc::EBADF));
// However, read should work
assert!(file.read(&mut [0u8; 1]).is_ok());
}

#[cfg(feature = "s3_tests")]
Expand Down

0 comments on commit 8fb1917

Please sign in to comment.