Skip to content

Commit

Permalink
enhance: Use fdopen, fwrite to reduce direct syscall (milvus-io#38157)
Browse files Browse the repository at this point in the history
`File.Write` and `File.WriteInt` use `write`, which may be just direct
syscall in some systems. When mappding field data and write line by
line, this could cost lost of CPU time when the row number is large.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Dec 3, 2024
1 parent e2521d8 commit b8600ac
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 11 deletions.
31 changes: 28 additions & 3 deletions internal/core/src/common/File.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@

#pragma once

#include <cstdio>
#include <string>
#include "common/EasyAssert.h"
#include "common/Types.h"
#include "fmt/core.h"
#include <fcntl.h>
#include <sys/types.h>
#include <unistd.h>

namespace milvus {
Expand All @@ -27,8 +29,8 @@ class File {
file.fd_ = -1;
}
~File() {
if (fd_ >= 0) {
close(fd_);
if (fs_ != nullptr) {
fclose(fs_);
}
}

Expand Down Expand Up @@ -63,22 +65,45 @@ class File {
return write(fd_, &value, sizeof(value));
}

ssize_t
FWrite(const void* buf, size_t size) {
return fwrite(buf, sizeof(char), size, fs_);
}

template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
ssize_t
FWriteInt(T value) {
return fwrite(&value, 1, sizeof(value), fs_);
}

int
FFlush() {
return fflush(fs_);
}

offset_t
Seek(offset_t offset, int whence) {
return lseek(fd_, offset, whence);
}

void
Close() {
close(fd_);
fclose(fs_);
fs_ = nullptr;
fd_ = -1;
}

private:
explicit File(int fd, const std::string& filepath)
: fd_(fd), filepath_(filepath) {
fs_ = fdopen(fd_, "wb+");
AssertInfo(fs_ != nullptr,
"failed to open file {}: {}",
filepath,
strerror(errno));
}
int fd_{-1};
FILE* fs_;
std::string filepath_;
};
} // namespace milvus
17 changes: 9 additions & 8 deletions internal/core/src/mmap/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ WriteFieldData(File& file,
auto str =
static_cast<const std::string*>(data->RawValue(i));
ssize_t written_data_size =
file.WriteInt<uint32_t>(uint32_t(str->size()));
file.FWriteInt<uint32_t>(uint32_t(str->size()));
if (written_data_size != sizeof(uint32_t)) {
THROW_FILE_WRITE_ERROR
}
total_written += written_data_size;
auto written_data = file.Write(str->data(), str->size());
auto written_data = file.FWrite(str->data(), str->size());
if (written_data < str->size()) {
THROW_FILE_WRITE_ERROR
}
Expand All @@ -120,14 +120,14 @@ WriteFieldData(File& file,
indices.push_back(total_written);
auto padded_string =
static_cast<const Json*>(data->RawValue(i))->data();
ssize_t written_data_size =
file.WriteInt<uint32_t>(uint32_t(padded_string.size()));
ssize_t written_data_size = file.FWriteInt<uint32_t>(
uint32_t(padded_string.size()));
if (written_data_size != sizeof(uint32_t)) {
THROW_FILE_WRITE_ERROR
}
total_written += written_data_size;
ssize_t written_data =
file.Write(padded_string.data(), padded_string.size());
file.FWrite(padded_string.data(), padded_string.size());
if (written_data < padded_string.size()) {
THROW_FILE_WRITE_ERROR
}
Expand All @@ -141,7 +141,7 @@ WriteFieldData(File& file,
indices.push_back(total_written);
auto array = static_cast<const Array*>(data->RawValue(i));
ssize_t written =
file.Write(array->data(), array->byte_size());
file.FWrite(array->data(), array->byte_size());
if (written < array->byte_size()) {
THROW_FILE_WRITE_ERROR
}
Expand All @@ -157,7 +157,7 @@ WriteFieldData(File& file,
static_cast<const knowhere::sparse::SparseRow<float>*>(
data->RawValue(i));
ssize_t written =
file.Write(vec->data(), vec->data_byte_size());
file.FWrite(vec->data(), vec->data_byte_size());
if (written < vec->data_byte_size()) {
break;
}
Expand All @@ -172,7 +172,7 @@ WriteFieldData(File& file,
}
} else {
// write as: data|data|data|data|data|data......
size_t written = file.Write(data->Data(), data->Size());
size_t written = file.FWrite(data->Data(), data->Size());
if (written < data->Size()) {
THROW_FILE_WRITE_ERROR
}
Expand All @@ -181,5 +181,6 @@ WriteFieldData(File& file,
total_written += data->Size(i);
}
}
file.FFlush();
}
} // namespace milvus

0 comments on commit b8600ac

Please sign in to comment.