Skip to content

Commit e85baaf

Browse files
authored
Merge f881f21 into 9bcbbd6
2 parents 9bcbbd6 + f881f21 commit e85baaf

File tree

4 files changed

+122
-29
lines changed

4 files changed

+122
-29
lines changed

ydb/library/yql/core/qplayer/storage/file/ut/yql_qstorage_file_ut.cpp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,22 @@
66

77
using namespace NYql;
88

9-
Y_UNIT_TEST_SUITE(TQStorageFileTests) {
10-
GENERATE_TESTS(MakeFileQStorage)
9+
IQStoragePtr MakeBufferedFileQStorage() {
10+
TFileQStorageSettings settings;
11+
settings.BufferUntilCommit = true;
12+
return MakeFileQStorage({}, settings);
13+
}
14+
15+
IQStoragePtr MakeUnbufferedFileQStorage() {
16+
TFileQStorageSettings settings;
17+
settings.BufferUntilCommit = false;
18+
return MakeFileQStorage({}, settings);
19+
}
20+
21+
Y_UNIT_TEST_SUITE(TQStorageBufferedFileTests) {
22+
GENERATE_TESTS(MakeBufferedFileQStorage)
23+
}
24+
25+
Y_UNIT_TEST_SUITE(TQStorageUnbufferedFileTests) {
26+
GENERATE_TESTS(MakeUnbufferedFileQStorage)
1127
}

ydb/library/yql/core/qplayer/storage/file/yql_qstorage_file.cpp

Lines changed: 98 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,44 @@ namespace NYql {
1414

1515
namespace {
1616

17-
class TWriter : public IQWriter {
18-
public:
19-
TWriter(TFsPath& path)
17+
class TWriterBase : public IQWriter {
18+
protected:
19+
TWriterBase(TFsPath& path, TInstant writtenAt)
2020
: Path_(path)
21+
, WrittenAt_(writtenAt)
22+
{}
23+
24+
protected:
25+
void WriteIndex(ui64 totalItems, ui64 totalBytes, ui64 checksum) const {
26+
TFileOutput indexFile(Path_.GetPath() + ".idx.tmp");
27+
indexFile.Write(&WrittenAt_, sizeof(WrittenAt_));
28+
indexFile.Write(&totalItems, sizeof(totalItems));
29+
indexFile.Write(&totalBytes, sizeof(totalBytes));
30+
indexFile.Write(&checksum, sizeof(checksum));
31+
indexFile.Finish();
32+
if (!NFs::Rename(Path_.GetPath() + ".idx.tmp", Path_.GetPath() + ".idx")) {
33+
throw yexception() << "can not rename: " << LastSystemErrorText();
34+
}
35+
}
36+
37+
static void SaveString(TFileOutput& file, const TString& str, ui64& totalBytes, ui64& checksum) {
38+
ui32 length = str.Size();
39+
checksum = crc64(&length, sizeof(length), checksum);
40+
file.Write(&length, sizeof(length));
41+
checksum = crc64(str.Data(), length, checksum);
42+
file.Write(str.Data(), length);
43+
totalBytes += length;
44+
}
45+
46+
protected:
47+
const TFsPath Path_;
48+
const TInstant WrittenAt_;
49+
};
50+
51+
class TBufferedWriter : public TWriterBase {
52+
public:
53+
TBufferedWriter(TFsPath& path, TInstant writtenAt)
54+
: TWriterBase(path, writtenAt)
2155
, Storage_(MakeMemoryQStorage())
2256
, Writer_(Storage_->MakeWriter("", {}))
2357
{
@@ -36,6 +70,7 @@ class TWriter : public IQWriter {
3670
private:
3771
void SaveFile(const IQIteratorPtr& iterator) {
3872
TFileOutput dataFile(Path_.GetPath() + ".dat");
73+
dataFile.Write(&WrittenAt_, sizeof(WrittenAt_));
3974
ui64 totalItems = 0;
4075
ui64 totalBytes = 0;
4176
ui64 checksum = 0;
@@ -52,45 +87,77 @@ class TWriter : public IQWriter {
5287
}
5388

5489
dataFile.Finish();
55-
TFileOutput indexFile(Path_.GetPath() + ".idx.tmp");
56-
indexFile.Write(&totalItems, sizeof(totalItems));
57-
indexFile.Write(&totalBytes, sizeof(totalBytes));
58-
indexFile.Write(&checksum, sizeof(checksum));
59-
if (!NFs::Rename(Path_.GetPath() + ".idx.tmp", Path_.GetPath() + ".idx")) {
60-
throw yexception() << "can not rename: " << LastSystemErrorText();
90+
WriteIndex(totalItems, totalBytes, checksum);
91+
}
92+
93+
private:
94+
const IQStoragePtr Storage_;
95+
const IQWriterPtr Writer_;
96+
};
97+
98+
class TUnbufferedWriter : public TWriterBase {
99+
public:
100+
TUnbufferedWriter(TFsPath& path, TInstant writtenAt)
101+
: TWriterBase(path, writtenAt)
102+
, DataFile_(Path_.GetPath() + ".dat")
103+
{
104+
DataFile_.Write(&WrittenAt_, sizeof(WrittenAt_));
105+
}
106+
107+
NThreading::TFuture<void> Put(const TQItemKey& key, const TString& value) final {
108+
with_lock(Mutex_) {
109+
Y_ENSURE(!Committed_);
110+
if (Keys_.emplace(key).second) {
111+
SaveString(DataFile_, key.Component, TotalBytes_, Checksum_);
112+
SaveString(DataFile_, key.Label, TotalBytes_, Checksum_);
113+
SaveString(DataFile_, value, TotalBytes_, Checksum_);
114+
++TotalItems_;
115+
}
116+
117+
return NThreading::MakeFuture();
61118
}
62119
}
63120

64-
void SaveString(TFileOutput& file, const TString& str, ui64& totalBytes, ui64& checksum) {
65-
ui32 length = str.Size();
66-
checksum = crc64(&length, sizeof(length), checksum);
67-
file.Write(&length, sizeof(length));
68-
checksum = crc64(str.Data(), length, checksum);
69-
file.Write(str.Data(), length);
70-
totalBytes += length;
121+
NThreading::TFuture<void> Commit() final {
122+
with_lock(Mutex_) {
123+
Y_ENSURE(!Committed_);
124+
Committed_ = true;
125+
DataFile_.Finish();
126+
WriteIndex(TotalItems_, TotalBytes_, Checksum_);
127+
return NThreading::MakeFuture();
128+
}
71129
}
72130

73131
private:
74-
const TFsPath Path_;
75-
const IQStoragePtr Storage_;
76-
const IQWriterPtr Writer_;
132+
TMutex Mutex_;
133+
TFileOutput DataFile_;
134+
ui64 TotalItems_ = 0;
135+
ui64 TotalBytes_ = 0;
136+
ui64 Checksum_ = 0;
137+
THashSet<TQItemKey> Keys_;
138+
bool Committed_ = false;
77139
};
78140

79141
class TStorage : public IQStorage {
80142
public:
81-
TStorage(const TString& folder)
143+
TStorage(const TString& folder, const TFileQStorageSettings& settings)
82144
: Folder_(folder)
145+
, Settings_(settings)
83146
{
84147
if (!Folder_.IsDefined()) {
85148
TmpDir_.ConstructInPlace();
86149
Folder_ = TmpDir_->Path();
87150
}
88151
}
89152

90-
IQWriterPtr MakeWriter(const TString& operationId, const TQWriterSettings& settings) const final {
91-
Y_UNUSED(settings);
153+
IQWriterPtr MakeWriter(const TString& operationId, const TQWriterSettings& writerSettings) const final {
92154
auto opPath = Folder_ / operationId;
93-
return std::make_shared<TWriter>(opPath);
155+
auto writtenAt = writerSettings.WrittenAt.GetOrElse(Now());
156+
if (Settings_.BufferUntilCommit) {
157+
return std::make_shared<TBufferedWriter>(opPath, writtenAt);
158+
} else {
159+
return std::make_shared<TUnbufferedWriter>(opPath, writtenAt);
160+
}
94161
}
95162

96163
IQReaderPtr MakeReader(const TString& operationId, const TQReaderSettings& settings) const final {
@@ -115,14 +182,19 @@ class TStorage : public IQStorage {
115182

116183
auto writer = memory->MakeWriter("", {});
117184
TFileInput indexFile(indexPath.GetPath());
185+
TInstant indexWrittenAt;
118186
ui64 totalItems, loadedTotalBytes, loadedChecksum;
187+
indexFile.LoadOrFail(&indexWrittenAt, sizeof(indexWrittenAt));
119188
indexFile.LoadOrFail(&totalItems, sizeof(totalItems));
120189
indexFile.LoadOrFail(&loadedTotalBytes, sizeof(loadedTotalBytes));
121190
indexFile.LoadOrFail(&loadedChecksum, sizeof(loadedChecksum));
122191
char dummy;
123192
Y_ENSURE(!indexFile.ReadChar(dummy));
124193
const TFsPath& dataPath = Folder_ / (operationId + ".dat");
125194
TFileInput dataFile(dataPath.GetPath());
195+
TInstant dataWrittenAt;
196+
dataFile.LoadOrFail(&dataWrittenAt, sizeof(dataWrittenAt));
197+
Y_ENSURE(indexWrittenAt == dataWrittenAt);
126198
ui64 totalBytes = 0, checksum = 0;
127199
for (ui64 i = 0; i < totalItems; ++i) {
128200
TQItemKey key;
@@ -160,12 +232,13 @@ class TStorage : public IQStorage {
160232
private:
161233
TMaybe<TTempDir> TmpDir_;
162234
TFsPath Folder_;
235+
const TFileQStorageSettings Settings_;
163236
};
164237

165238
}
166239

167-
IQStoragePtr MakeFileQStorage(const TString& folder) {
168-
return std::make_shared<TStorage>(folder);
240+
IQStoragePtr MakeFileQStorage(const TString& folder, const TFileQStorageSettings& settings) {
241+
return std::make_shared<TStorage>(folder, settings);
169242
}
170243

171244
};

ydb/library/yql/core/qplayer/storage/file/yql_qstorage_file.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33

44
namespace NYql {
55

6-
IQStoragePtr MakeFileQStorage(const TString& folder = {});
6+
struct TFileQStorageSettings {
7+
bool BufferUntilCommit = true;
8+
};
9+
10+
IQStoragePtr MakeFileQStorage(const TString& folder = {}, const TFileQStorageSettings& settings = {});
711

812
};

ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class TWriter : public IQWriter {
5353
NThreading::TFuture<void> Put(const TQItemKey& key, const TString& value) final {
5454
with_lock(Operation_->Mutex) {
5555
Y_ENSURE(!Operation_->Committed);
56-
(*Operation_->WriteMap)[key] = value;
56+
Operation_->WriteMap->emplace(key, value);
5757
return NThreading::MakeFuture();
5858
}
5959
}

0 commit comments

Comments
 (0)