Skip to content

Commit

Permalink
blk_io: Chunked IO, atomic fsize, better handling
Browse files Browse the repository at this point in the history
- Read/write buffers in a chunked loop fashion, to prevent some hosts from being confused. Pass no more than 256M to a single syscall at once. Fixes #81.
- Better handling of interrupted syscalls or partial IO
- Optimize premature return case of reaching EOF by checking file size in userspace
- Atomic file size counter, to prevent multiple IO threads doing rvwrite() from clashing with each other
  • Loading branch information
LekKit authored Mar 13, 2023
1 parent 4085ca5 commit c47cbf7
Showing 1 changed file with 66 additions and 27 deletions.
93 changes: 66 additions & 27 deletions src/blk_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
#define _FILE_OFFSET_BITS 64
#define _LARGEFILE64_SOURCE

#include <string.h>
#include "blk_io.h"
#include "utils.h"
#include "spinlock.h"
#include "threading.h"
#include <string.h>

// Maximum buffer size processed per internal IO syscall
#define RVFILE_MAX_BUFF 0x10000000

#if (defined(__unix__) || defined(__APPLE__) || defined(__HAIKU__)) && !defined(USE_STDIO)
// POSIX implementation using open, pread, pwrite...
Expand Down Expand Up @@ -226,7 +229,7 @@ void rvclose(rvfile_t *file)
uint64_t rvfilesize(rvfile_t* file)
{
if (!file) return 0;
return file->size;
return atomic_load_uint64(&file->size);
}

#if defined(POSIX_FILE_IMPL) && defined(RVREAD_MMAP_FILE)
Expand All @@ -253,25 +256,41 @@ static void rvread_mmap_file(rvfile_t* file, void* destination, size_t count, ui

size_t rvread(rvfile_t* file, void* destination, size_t count, uint64_t offset)
{
if (!file) return 0;
uint64_t pos_real = (offset == RVFILE_CURPOS) ? file->pos : offset;
if (!file || count == 0) return 0;
uint64_t pos = (offset == RVFILE_CURPOS) ? file->pos : offset;
uint8_t* buffer = destination;
size_t ret = 0;
#if defined(POSIX_FILE_IMPL)
ssize_t ret = pread(file->fd, destination, count, pos_real);
if (ret < 0) ret = 0;
while (ret < count) {
size_t size = EVAL_MIN(count - ret, RVFILE_MAX_BUFF);
ssize_t tmp = pread(file->fd, buffer + ret, size, pos + ret);
if (tmp > 0) ret += tmp;
if (tmp == 0 || (tmp < 0 && errno != EINTR) || pos + ret >= rvfilesize(file)) break;
}
#ifdef RVREAD_MMAP_FILE
rvread_mmap_file(file, destination, ret, pos_real);
rvread_mmap_file(file, destination, ret, pos);
#endif
#elif defined(WIN32_FILE_IMPL)
OVERLAPPED overlapped = { .OffsetHigh = pos_real >> 32, .Offset = (uint32_t)pos_real };
DWORD ret = 0;
ReadFile(file->handle, destination, count, &ret, &overlapped);
while (ret < count) {
size_t size = EVAL_MIN(count - ret, RVFILE_MAX_BUFF);
OVERLAPPED overlapped = { .OffsetHigh = (pos + ret) >> 32, .Offset = (uint32_t)(pos + ret) };
DWORD tmp = 0;
ReadFile(file->handle, buffer + ret, size, &tmp, &overlapped);
ret += tmp;
if (tmp == 0 || pos + ret >= rvfilesize(file)) break;
}
#else
spin_lock_slow(&file->lock);
if (pos_real != file->pos_real || !(file->pos_state & FILE_POS_READ)) {
fseek(file->fp, pos_real, SEEK_SET);
if (pos != file->pos_real || !(file->pos_state & FILE_POS_READ)) {
fseek(file->fp, pos, SEEK_SET);
}
size_t ret = fread(destination, 1, count, file->fp);
file->pos_real = pos_real + ret;
while (ret < count) {
size_t size = EVAL_MIN(count - ret, RVFILE_MAX_BUFF);
size_t tmp = fread(buffer + ret, 1, size, file->fp);
ret += tmp;
if (tmp == 0 || pos + ret >= rvfilesize(file)) break;
}
file->pos_real = pos + ret;
file->pos_state = FILE_POS_READ;
spin_unlock(&file->lock);
#endif
Expand All @@ -281,27 +300,47 @@ size_t rvread(rvfile_t* file, void* destination, size_t count, uint64_t offset)

size_t rvwrite(rvfile_t* file, const void* source, size_t count, uint64_t offset)
{
if (!file) return 0;
uint64_t pos_real = (offset == RVFILE_CURPOS) ? file->pos : offset;
if (!file || count == 0) return 0;
uint64_t pos = (offset == RVFILE_CURPOS) ? file->pos : offset;
const uint8_t* buffer = source;
size_t ret = 0;
#if defined(POSIX_FILE_IMPL)
ssize_t ret = pwrite(file->fd, source, count, pos_real);
if (ret < 0) ret = 0;
while (ret < count) {
size_t size = EVAL_MIN(count - ret, RVFILE_MAX_BUFF);
ssize_t tmp = pwrite(file->fd, buffer + ret, size, pos + ret);
if (tmp > 0) ret += tmp;
if (tmp == 0 || (tmp < 0 && errno != EINTR)) break;
}
#elif defined(WIN32_FILE_IMPL)
OVERLAPPED overlapped = { .OffsetHigh = pos_real >> 32, .Offset = (uint32_t)pos_real };
DWORD ret = 0;
WriteFile(file->handle, source, count, &ret, &overlapped);
while (ret < count) {
size_t size = EVAL_MIN(count - ret, RVFILE_MAX_BUFF);
OVERLAPPED overlapped = { .OffsetHigh = (pos + ret) >> 32, .Offset = (uint32_t)(pos + ret) };
DWORD tmp = 0;
WriteFile(file->handle, buffer + ret, size, &tmp, &overlapped);
ret += tmp;
if (tmp == 0) break;
}
#else
spin_lock_slow(&file->lock);
if (pos_real != file->pos_real || !(file->pos_state & FILE_POS_WRITE)) {
fseek(file->fp, pos_real, SEEK_SET);
if (pos != file->pos_real || !(file->pos_state & FILE_POS_WRITE)) {
fseek(file->fp, pos, SEEK_SET);
}
while (ret < count) {
size_t size = EVAL_MIN(count - ret, RVFILE_MAX_BUFF);
size_t tmp = fwrite(buffer + ret, 1, size, file->fp);
ret += tmp;
if (tmp == 0) break;
}
size_t ret = fwrite(source, 1, count, file->fp);
file->pos_real = pos_real + ret;
file->pos_real = pos + ret;
file->pos_state = FILE_POS_WRITE;
spin_unlock(&file->lock);
#endif
if (offset == RVFILE_CURPOS) file->pos += ret;
if (pos_real + ret > file->size) file->size = pos_real + ret;
uint64_t file_size = 0;
do {
file_size = atomic_load_uint64(&file->size);
if (likely(pos + ret <= file->size)) break;
} while (!atomic_cas_uint64_ex(&file->size, file_size, pos + ret, true, ATOMIC_RELEASE, ATOMIC_ACQUIRE));
return ret;
}

Expand Down Expand Up @@ -331,7 +370,7 @@ bool rvseek(rvfile_t* file, int64_t offset, uint8_t startpos)
if (startpos == RVFILE_CUR) {
offset = file->pos + offset;
} else if (startpos == RVFILE_END) {
offset = file->size - offset;
offset = rvfilesize(file) - offset;
}
if (startpos != RVFILE_SET && offset < 0) return false;
file->pos = (uint64_t)offset;
Expand Down

0 comments on commit c47cbf7

Please sign in to comment.