Skip to content

use butil::IOBuf in WriteChunk #81

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/chunkserver/datastore/chunkserver_chunkfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ CSErrorCode CSChunkFile::LoadSnapshot(SequenceNum sn) {
}

CSErrorCode CSChunkFile::Write(SequenceNum sn,
const char * buf,
const butil::IOBuf& buf,
off_t offset,
size_t length,
uint32_t* cost) {
Expand Down
22 changes: 21 additions & 1 deletion src/chunkserver/datastore/chunkserver_chunkfile.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#define SRC_CHUNKSERVER_DATASTORE_CHUNKSERVER_CHUNKFILE_H_

#include <glog/logging.h>
#include <butil/iobuf.h>
#include <string>
#include <vector>
#include <set>
Expand Down Expand Up @@ -147,7 +148,7 @@ class CSChunkFile {
* @return: 返回错误码
*/
CSErrorCode Write(SequenceNum sn,
const char * buf,
const butil::IOBuf& buf,
off_t offset,
size_t length,
uint32_t* cost);
Expand Down Expand Up @@ -289,6 +290,25 @@ class CSChunkFile {
return rc;
}

inline int writeData(const butil::IOBuf& buf, off_t offset, size_t length) {
int rc = lfs_->Write(fd_, buf, offset + pageSize_, length);
if (rc < 0) {
return rc;
}
// 如果是clone chunk,需要判断是否需要更改bitmap并更新metapage
if (isCloneChunk_) {
uint32_t beginIndex = offset / pageSize_;
uint32_t endIndex = (offset + length - 1) / pageSize_;
for (uint32_t i = beginIndex; i <= endIndex; ++i) {
// 记录dirty page
if (!metaPage_.bitmap->Test(i)) {
dirtyPages_.insert(i);
}
}
}
return rc;
}

inline bool CheckOffsetAndLength(off_t offset, size_t len) {
// 检查offset+len是否越界
if (offset + len > size_) {
Expand Down
2 changes: 1 addition & 1 deletion src/chunkserver/datastore/chunkserver_datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ CSErrorCode CSDataStore::CreateChunkFile(const ChunkOptions & options,

CSErrorCode CSDataStore::WriteChunk(ChunkID id,
SequenceNum sn,
const char * buf,
const butil::IOBuf& buf,
off_t offset,
size_t length,
uint32_t* cost,
Expand Down
18 changes: 17 additions & 1 deletion src/chunkserver/datastore/chunkserver_datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include <bvar/bvar.h>
#include <glog/logging.h>
#include <butil/iobuf.h>
#include <string>
#include <vector>
#include <unordered_map>
Expand All @@ -45,6 +46,8 @@ using curve::fs::LocalFileSystem;
using ::curve::common::Atomic;
using CSChunkFilePtr = std::shared_ptr<CSChunkFile>;

inline void TrivialDeleter(void* ptr) {}

/**
* DataStore的配置参数
* baseDir:DataStore管理的目录路径
Expand Down Expand Up @@ -205,11 +208,24 @@ class CSDataStore {
*/
virtual CSErrorCode WriteChunk(ChunkID id,
SequenceNum sn,
const char * buf,
const butil::IOBuf& buf,
off_t offset,
size_t length,
uint32_t* cost,
const std::string & cloneSourceLocation = "");

// Deprecated, only use for unit & integration test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deprecated? 现在的测试中还是要用的吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

测试代码里还是用了很多这个接口,所以暂时没改掉。
这里备注了一下,正常情况下不应该使用这个接口,只有单元测试/集成测试需要。

virtual CSErrorCode WriteChunk(
ChunkID id, SequenceNum sn, const char* buf, off_t offset,
size_t length, uint32_t* cost,
const std::string& cloneSourceLocation = "") {
butil::IOBuf data;
data.append_user_data(const_cast<char*>(buf), length, TrivialDeleter);

return WriteChunk(id, sn, data, offset, length, cost,
cloneSourceLocation);
}

/**
* 创建克隆的Chunk,chunk中记录数据源位置信息
* 该接口需要保证幂等性,重复以相同参数进行创建返回成功
Expand Down
4 changes: 2 additions & 2 deletions src/chunkserver/op_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ void WriteChunkRequest::OnApply(uint64_t index,

auto ret = datastore_->WriteChunk(request_->chunkid(),
request_->sn(),
cntl_->request_attachment().to_string().c_str(), //NOLINT
cntl_->request_attachment(),
request_->offset(),
request_->size(),
&cost,
Expand Down Expand Up @@ -513,7 +513,7 @@ void WriteChunkRequest::OnApplyFromLog(std::shared_ptr<CSDataStore> datastore,

auto ret = datastore->WriteChunk(request.chunkid(),
request.sn(),
data.to_string().c_str(),
data,
request.offset(),
request.size(),
&cost,
Expand Down
3 changes: 2 additions & 1 deletion src/fs/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ cc_library(
]),
deps = [
"//src/common:curve_common",
"//external:glog"
"//external:glog",
"//external:butil",
],
visibility = ["//visibility:public"],
)
27 changes: 27 additions & 0 deletions src/fs/ext4_filesystem_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,33 @@ int Ext4FileSystemImpl::Write(int fd,
return length;
}

int Ext4FileSystemImpl::Write(int fd,
butil::IOBuf buf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const &

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是需要把iobuf写入到fd中,会涉及到对iobuf的修改,所以不能用const引用。
这里只拷贝了一次iobuf,开销不大。

uint64_t offset,
int length) {
int remainLength = length;
int relativeOffset = 0;
int retryTimes = 0;

while (remainLength > 0) {
ssize_t ret = buf.pcut_into_file_descriptor(fd, offset, remainLength);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

会分批写入文件吗

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

正常来说不会。这里内部还是用的pwritev来做的,有可能会返回EINTR。

if (ret < 0) {
if (errno == EINTR || retryTimes < MAX_RETYR_TIME) {
++retryTimes;
continue;
}
LOG(ERROR) << "IOBuf::pcut_into_file_descriptor failed: "
<< strerror(errno);
return -errno;
}

remainLength -= ret;
offset += ret;
}

return length;
}

int Ext4FileSystemImpl::Append(int fd,
const char *buf,
int length) {
Expand Down
5 changes: 4 additions & 1 deletion src/fs/ext4_filesystem_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
#ifndef SRC_FS_EXT4_FILESYSTEM_IMPL_H_
#define SRC_FS_EXT4_FILESYSTEM_IMPL_H_

#include <butil/iobuf.h>

#include <map>
#include <memory>
#include <string>
#include <vector>
#include <map>

#include "src/fs/local_filesystem.h"
#include "src/fs/wrap_posix.h"
Expand All @@ -52,6 +54,7 @@ class Ext4FileSystemImpl : public LocalFileSystem {
int List(const string& dirPath, vector<std::string>* names) override;
int Read(int fd, char* buf, uint64_t offset, int length) override;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read这里返回也会从char*拷贝一次到iobuf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

读请求的buffer,会通过iobuf::append_user_data的方式由iobuf管理,这种情况不会拷贝。

int Write(int fd, const char* buf, uint64_t offset, int length) override;
int Write(int fd, butil::IOBuf buf, uint64_t offset, int length) override;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const &

int Append(int fd, const char* buf, int length) override;
int Fallocate(int fd, int op, uint64_t offset,
int length) override;
Expand Down
13 changes: 12 additions & 1 deletion src/fs/local_filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <inttypes.h>
#include <assert.h>
#include <sys/stat.h>
#include <butil/iobuf.h>
#include <memory>
#include <vector>
#include <map>
Expand Down Expand Up @@ -159,6 +160,17 @@ class LocalFileSystem {
*/
virtual int Write(int fd, const char* buf, uint64_t offset, int length) = 0;

/**
* 向文件指定区域写入数据
* @param fd:文件句柄id,通过Open接口获取
* @param buf:待写入数据
* @param offset:写入区域的起始偏移
* @param length:写入数据的长度
* @return 返回成功写入的数据长度,失败返回-1
*/
virtual int Write(int fd, butil::IOBuf buf, uint64_t offset,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上

int length) = 0;

/**
* 向文件末尾追加数据
* @param fd:文件句柄id,通过Open接口获取
Expand Down Expand Up @@ -218,4 +230,3 @@ class LocalFsFactory {
} // namespace fs
} // namespace curve
#endif // SRC_FS_LOCAL_FILESYSTEM_H_

5 changes: 3 additions & 2 deletions test/chunkserver/conf_epoch_file_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ using ::testing::_;
using ::testing::Invoke;
using ::testing::Return;
using ::testing::AnyNumber;
using ::testing::Matcher;
using ::testing::DoAll;
using ::testing::SetArgPointee;
using ::testing::SetArrayArgument;
Expand Down Expand Up @@ -188,7 +189,7 @@ TEST(ConfEpochFileTest, load_save) {
CopysetID loadCopysetID;
uint64_t loadEpoch;
EXPECT_CALL(*fs, Open(_, _)).Times(1).WillOnce(Return(10));
EXPECT_CALL(*fs, Write(_, _, _, _)).Times(1)
EXPECT_CALL(*fs, Write(_, Matcher<const char*>(_), _, _)).Times(1)
.WillOnce(Return(-1));
EXPECT_CALL(*fs, Close(_)).Times(1).WillOnce(Return(0));
ASSERT_EQ(-1, confEpochFile.Save(path,
Expand All @@ -204,7 +205,7 @@ TEST(ConfEpochFileTest, load_save) {
= std::make_shared<MockLocalFileSystem>();
ConfEpochFile confEpochFile(fs);
EXPECT_CALL(*fs, Open(_, _)).Times(1).WillOnce(Return(10));
EXPECT_CALL(*fs, Write(_, _, _, _)).Times(1)
EXPECT_CALL(*fs, Write(_, Matcher<const char*>(_), _, _)).Times(1)
.WillOnce(Return(jsonStr.size()));
EXPECT_CALL(*fs, Close(_)).Times(1).WillOnce(Return(0));
EXPECT_CALL(*fs, Fsync(_)).Times(1).WillOnce(Return(-1));
Expand Down
5 changes: 3 additions & 2 deletions test/chunkserver/copyset_node_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ using ::testing::_;
using ::testing::Invoke;
using ::testing::Return;
using ::testing::AnyNumber;
using ::testing::Matcher;
using ::testing::DoAll;
using ::testing::SetArgPointee;
using ::testing::SetArgReferee;
Expand Down Expand Up @@ -186,7 +187,7 @@ TEST_F(CopysetNodeTest, error_test) {
copysetNode.SetLocalFileSystem(mockfs);
copysetNode.SetConfEpochFile(std::move(epochFile));
EXPECT_CALL(*mockfs, Open(_, _)).Times(1).WillOnce(Return(10));
EXPECT_CALL(*mockfs, Write(_, _, _, _)).Times(1)
EXPECT_CALL(*mockfs, Write(_, Matcher<const char*>(_), _, _)).Times(1)
.WillOnce(Return(jsonStr.size()));
EXPECT_CALL(*mockfs, Fsync(_)).Times(1).WillOnce(Return(0));
EXPECT_CALL(*mockfs, Close(_)).Times(1).WillOnce(Return(0));
Expand Down Expand Up @@ -245,7 +246,7 @@ TEST_F(CopysetNodeTest, error_test) {
copysetNode.SetLocalFileSystem(mockfs);
copysetNode.SetConfEpochFile(std::move(epochFile));
EXPECT_CALL(*mockfs, Open(_, _)).Times(1).WillOnce(Return(10));
EXPECT_CALL(*mockfs, Write(_, _, _, _)).Times(1)
EXPECT_CALL(*mockfs, Write(_, Matcher<const char*>(_), _, _)).Times(1)
.WillOnce(Return(jsonStr.size()));
EXPECT_CALL(*mockfs, Fsync(_)).Times(1).WillOnce(Return(0));
EXPECT_CALL(*mockfs, Close(_)).Times(1).WillOnce(Return(0));
Expand Down
16 changes: 10 additions & 6 deletions test/chunkserver/datastore/chunkfilepool_mock_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ using ::testing::Ge;
using ::testing::Gt;
using ::testing::Return;
using ::testing::NotNull;
using ::testing::Matcher;
using ::testing::Mock;
using ::testing::Truly;
using ::testing::DoAll;
Expand Down Expand Up @@ -161,7 +162,7 @@ TEST_F(CSChunkfilePoolMockTest, PersistEnCodeMetaInfoTest) {
{
EXPECT_CALL(*lfs_, Open(poolMetaPath, _))
.WillOnce(Return(-1));
EXPECT_CALL(*lfs_, Write(_, _, _, _))
EXPECT_CALL(*lfs_, Write(_, Matcher<const char*>(_), _, _))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥这些还是会带char*的参数?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是因为写入操作可能会与快照的metapage有关,更新metapage调用的是Write(_, const char*,....)这个接口。

.Times(0);
EXPECT_CALL(*lfs_, Close(_))
.Times(0);
Expand All @@ -176,7 +177,7 @@ TEST_F(CSChunkfilePoolMockTest, PersistEnCodeMetaInfoTest) {
{
EXPECT_CALL(*lfs_, Open(poolMetaPath, _))
.WillOnce(Return(1));
EXPECT_CALL(*lfs_, Write(1, NotNull(), 0, 4096))
EXPECT_CALL(*lfs_, Write(1, Matcher<const char*>(NotNull()), 0, 4096))
.WillOnce(Return(-1));
EXPECT_CALL(*lfs_, Close(1))
.Times(1);
Expand All @@ -191,7 +192,7 @@ TEST_F(CSChunkfilePoolMockTest, PersistEnCodeMetaInfoTest) {
{
EXPECT_CALL(*lfs_, Open(poolMetaPath, _))
.WillOnce(Return(1));
EXPECT_CALL(*lfs_, Write(1, NotNull(), 0, 4096))
EXPECT_CALL(*lfs_, Write(1, Matcher<const char*>(NotNull()), 0, 4096))
.WillOnce(Return(4096));
EXPECT_CALL(*lfs_, Close(1))
.Times(1);
Expand Down Expand Up @@ -770,7 +771,8 @@ TEST_F(CSChunkfilePoolMockTest, GetChunkTest) {
EXPECT_CALL(*lfs_, Fallocate(1, 0, 0, fileSize))
.Times(retryTimes)
.WillRepeatedly(Return(0));
EXPECT_CALL(*lfs_, Write(1, NotNull(), 0, fileSize))
EXPECT_CALL(*lfs_,
Write(1, Matcher<const char*>(NotNull()), 0, fileSize))
.Times(retryTimes)
.WillRepeatedly(Return(-1));
EXPECT_CALL(*lfs_, Close(1))
Expand All @@ -787,7 +789,8 @@ TEST_F(CSChunkfilePoolMockTest, GetChunkTest) {
EXPECT_CALL(*lfs_, Fallocate(1, 0, 0, fileSize))
.Times(retryTimes)
.WillRepeatedly(Return(0));
EXPECT_CALL(*lfs_, Write(1, NotNull(), 0, fileSize))
EXPECT_CALL(*lfs_,
Write(1, Matcher<const char*>(NotNull()), 0, fileSize))
.Times(retryTimes)
.WillRepeatedly(Return(fileSize));
EXPECT_CALL(*lfs_, Fsync(1))
Expand All @@ -807,7 +810,8 @@ TEST_F(CSChunkfilePoolMockTest, GetChunkTest) {
EXPECT_CALL(*lfs_, Fallocate(1, 0, 0, fileSize))
.Times(retryTimes)
.WillRepeatedly(Return(0));
EXPECT_CALL(*lfs_, Write(1, NotNull(), 0, fileSize))
EXPECT_CALL(*lfs_,
Write(1, Matcher<const char*>(NotNull()), 0, fileSize))
.Times(retryTimes)
.WillRepeatedly(Return(fileSize));
EXPECT_CALL(*lfs_, Fsync(1))
Expand Down
Loading