@@ -14,10 +14,44 @@ namespace NYql {
14
14
15
15
namespace {
16
16
17
- class TWriter : public IQWriter {
18
- public :
19
- TWriter (TFsPath& path)
17
+ class TWriterBase : public IQWriter {
18
+ protected :
19
+ TWriterBase (TFsPath& path, TInstant writtenAt )
20
20
: Path_(path)
21
+ , WrittenAt_(writtenAt)
22
+ {}
23
+
24
+ protected:
25
+ void WriteIndex (ui64 totalItems, ui64 totalBytes, ui64 checksum) const {
26
+ TFileOutput indexFile (Path_.GetPath () + " .idx.tmp" );
27
+ indexFile.Write (&WrittenAt_, sizeof (WrittenAt_));
28
+ indexFile.Write (&totalItems, sizeof (totalItems));
29
+ indexFile.Write (&totalBytes, sizeof (totalBytes));
30
+ indexFile.Write (&checksum, sizeof (checksum));
31
+ indexFile.Finish ();
32
+ if (!NFs::Rename (Path_.GetPath () + " .idx.tmp" , Path_.GetPath () + " .idx" )) {
33
+ throw yexception () << " can not rename: " << LastSystemErrorText ();
34
+ }
35
+ }
36
+
37
+ static void SaveString (TFileOutput& file, const TString& str, ui64& totalBytes, ui64& checksum) {
38
+ ui32 length = str.Size ();
39
+ checksum = crc64 (&length, sizeof (length), checksum);
40
+ file.Write (&length, sizeof (length));
41
+ checksum = crc64 (str.Data (), length, checksum);
42
+ file.Write (str.Data (), length);
43
+ totalBytes += length;
44
+ }
45
+
46
+ protected:
47
+ const TFsPath Path_;
48
+ const TInstant WrittenAt_;
49
+ };
50
+
51
+ class TBufferedWriter : public TWriterBase {
52
+ public:
53
+ TBufferedWriter (TFsPath& path, TInstant writtenAt)
54
+ : TWriterBase(path, writtenAt)
21
55
, Storage_(MakeMemoryQStorage())
22
56
, Writer_(Storage_->MakeWriter (" " , {}))
23
57
{
@@ -36,6 +70,7 @@ class TWriter : public IQWriter {
36
70
private:
37
71
void SaveFile (const IQIteratorPtr& iterator) {
38
72
TFileOutput dataFile (Path_.GetPath () + " .dat" );
73
+ dataFile.Write (&WrittenAt_, sizeof (WrittenAt_));
39
74
ui64 totalItems = 0 ;
40
75
ui64 totalBytes = 0 ;
41
76
ui64 checksum = 0 ;
@@ -52,45 +87,77 @@ class TWriter : public IQWriter {
52
87
}
53
88
54
89
dataFile.Finish ();
55
- TFileOutput indexFile (Path_.GetPath () + " .idx.tmp" );
56
- indexFile.Write (&totalItems, sizeof (totalItems));
57
- indexFile.Write (&totalBytes, sizeof (totalBytes));
58
- indexFile.Write (&checksum, sizeof (checksum));
59
- if (!NFs::Rename (Path_.GetPath () + " .idx.tmp" , Path_.GetPath () + " .idx" )) {
60
- throw yexception () << " can not rename: " << LastSystemErrorText ();
90
+ WriteIndex (totalItems, totalBytes, checksum);
91
+ }
92
+
93
+ private:
94
+ const IQStoragePtr Storage_;
95
+ const IQWriterPtr Writer_;
96
+ };
97
+
98
+ class TUnbufferedWriter : public TWriterBase {
99
+ public:
100
+ TUnbufferedWriter (TFsPath& path, TInstant writtenAt)
101
+ : TWriterBase(path, writtenAt)
102
+ , DataFile_(Path_.GetPath() + " .dat" )
103
+ {
104
+ DataFile_.Write (&WrittenAt_, sizeof (WrittenAt_));
105
+ }
106
+
107
+ NThreading::TFuture<void > Put (const TQItemKey& key, const TString& value) final {
108
+ with_lock (Mutex_) {
109
+ Y_ENSURE (!Committed_);
110
+ if (Keys_.emplace (key).second ) {
111
+ SaveString (DataFile_, key.Component , TotalBytes_, Checksum_);
112
+ SaveString (DataFile_, key.Label , TotalBytes_, Checksum_);
113
+ SaveString (DataFile_, value, TotalBytes_, Checksum_);
114
+ ++TotalItems_;
115
+ }
116
+
117
+ return NThreading::MakeFuture ();
61
118
}
62
119
}
63
120
64
- void SaveString (TFileOutput& file, const TString& str, ui64& totalBytes, ui64& checksum) {
65
- ui32 length = str.Size ();
66
- checksum = crc64 (&length, sizeof (length), checksum);
67
- file.Write (&length, sizeof (length));
68
- checksum = crc64 (str.Data (), length, checksum);
69
- file.Write (str.Data (), length);
70
- totalBytes += length;
121
+ NThreading::TFuture<void > Commit () final {
122
+ with_lock (Mutex_) {
123
+ Y_ENSURE (!Committed_);
124
+ Committed_ = true ;
125
+ DataFile_.Finish ();
126
+ WriteIndex (TotalItems_, TotalBytes_, Checksum_);
127
+ return NThreading::MakeFuture ();
128
+ }
71
129
}
72
130
73
131
private:
74
- const TFsPath Path_;
75
- const IQStoragePtr Storage_;
76
- const IQWriterPtr Writer_;
132
+ TMutex Mutex_;
133
+ TFileOutput DataFile_;
134
+ ui64 TotalItems_ = 0 ;
135
+ ui64 TotalBytes_ = 0 ;
136
+ ui64 Checksum_ = 0 ;
137
+ THashSet<TQItemKey> Keys_;
138
+ bool Committed_ = false ;
77
139
};
78
140
79
141
class TStorage : public IQStorage {
80
142
public:
81
- TStorage (const TString& folder)
143
+ TStorage (const TString& folder, const TFileQStorageSettings& settings )
82
144
: Folder_(folder)
145
+ , Settings_(settings)
83
146
{
84
147
if (!Folder_.IsDefined ()) {
85
148
TmpDir_.ConstructInPlace ();
86
149
Folder_ = TmpDir_->Path ();
87
150
}
88
151
}
89
152
90
- IQWriterPtr MakeWriter (const TString& operationId, const TQWriterSettings& settings) const final {
91
- Y_UNUSED (settings);
153
+ IQWriterPtr MakeWriter (const TString& operationId, const TQWriterSettings& writerSettings) const final {
92
154
auto opPath = Folder_ / operationId;
93
- return std::make_shared<TWriter>(opPath);
155
+ auto writtenAt = writerSettings.WrittenAt .GetOrElse (Now ());
156
+ if (Settings_.BufferUntilCommit ) {
157
+ return std::make_shared<TBufferedWriter>(opPath, writtenAt);
158
+ } else {
159
+ return std::make_shared<TUnbufferedWriter>(opPath, writtenAt);
160
+ }
94
161
}
95
162
96
163
IQReaderPtr MakeReader (const TString& operationId, const TQReaderSettings& settings) const final {
@@ -115,14 +182,19 @@ class TStorage : public IQStorage {
115
182
116
183
auto writer = memory->MakeWriter (" " , {});
117
184
TFileInput indexFile (indexPath.GetPath ());
185
+ TInstant indexWrittenAt;
118
186
ui64 totalItems, loadedTotalBytes, loadedChecksum;
187
+ indexFile.LoadOrFail (&indexWrittenAt, sizeof (indexWrittenAt));
119
188
indexFile.LoadOrFail (&totalItems, sizeof (totalItems));
120
189
indexFile.LoadOrFail (&loadedTotalBytes, sizeof (loadedTotalBytes));
121
190
indexFile.LoadOrFail (&loadedChecksum, sizeof (loadedChecksum));
122
191
char dummy;
123
192
Y_ENSURE (!indexFile.ReadChar (dummy));
124
193
const TFsPath& dataPath = Folder_ / (operationId + " .dat" );
125
194
TFileInput dataFile (dataPath.GetPath ());
195
+ TInstant dataWrittenAt;
196
+ dataFile.LoadOrFail (&dataWrittenAt, sizeof (dataWrittenAt));
197
+ Y_ENSURE (indexWrittenAt == dataWrittenAt);
126
198
ui64 totalBytes = 0 , checksum = 0 ;
127
199
for (ui64 i = 0 ; i < totalItems; ++i) {
128
200
TQItemKey key;
@@ -160,12 +232,13 @@ class TStorage : public IQStorage {
160
232
private:
161
233
TMaybe<TTempDir> TmpDir_;
162
234
TFsPath Folder_;
235
+ const TFileQStorageSettings Settings_;
163
236
};
164
237
165
238
}
166
239
167
- IQStoragePtr MakeFileQStorage (const TString& folder) {
168
- return std::make_shared<TStorage>(folder);
240
+ IQStoragePtr MakeFileQStorage (const TString& folder, const TFileQStorageSettings& settings ) {
241
+ return std::make_shared<TStorage>(folder, settings );
169
242
}
170
243
171
244
};
0 commit comments