11#ifndef KIKIMR_DISABLE_S3_OPS
22
3- #include " export_s3_buffer .h"
3+ #include " export_s3_buffer_raw .h"
44#include " type_serialization.h"
55
6- #include < ydb/core/backup/common/checksum.h>
76#include < ydb/core/tablet_flat/flat_row_state.h>
87#include < yql/essentials/types/binary_json/read.h>
98#include < ydb/public/lib/scheme_types/scheme_type_id.h>
109
1110#include < library/cpp/string_utils/quote/quote.h>
1211
1312#include < util/datetime/base.h>
14- #include < util/generic/buffer.h>
1513#include < util/stream/buffer.h>
1614
17- #include < contrib/libs/zstd/include/zstd.h>
15+ namespace NKikimr {
16+ namespace NDataShard {
1817
19-
20- namespace NKikimr ::NDataShard {
21-
22- namespace {
23-
24- struct DestroyZCtx {
25- static void Destroy (::ZSTD_CCtx* p) noexcept {
26- ZSTD_freeCCtx (p);
27- }
28- };
29-
30- class TZStdCompressionProcessor {
31- public:
32- using TPtr = THolder<TZStdCompressionProcessor>;
33-
34- explicit TZStdCompressionProcessor (const TS3ExportBufferSettings::TCompressionSettings& settings);
35-
36- TString GetError () const {
37- return ZSTD_getErrorName (ErrorCode);
38- }
39-
40- bool AddData (TStringBuf data);
41-
42- TMaybe<TBuffer> Flush (bool prepare);
43-
44- private:
45- enum ECompressionResult {
46- CONTINUE,
47- DONE,
48- ERROR,
49- };
50-
51- ECompressionResult Compress (ZSTD_inBuffer* input, ZSTD_EndDirective endOp);
52- void Reset ();
53-
54- private:
55- const int CompressionLevel;
56- THolder<::ZSTD_CCtx, DestroyZCtx> Context;
57- size_t ErrorCode = 0 ;
58- TBuffer Buffer;
59- ui64 BytesAdded = 0 ;
60- };
61-
62- class TS3Buffer : public NExportScan ::IBuffer {
63- using TTagToColumn = IExport::TTableColumns;
64- using TTagToIndex = THashMap<ui32, ui32>; // index in IScan::TRow
65-
66- public:
67- explicit TS3Buffer (TS3ExportBufferSettings&& settings);
68-
69- void ColumnsOrder (const TVector<ui32>& tags) override ;
70- bool Collect (const NTable::IScan::TRow& row) override ;
71- IEventBase* PrepareEvent (bool last, NExportScan::IBuffer::TStats& stats) override ;
72- void Clear () override ;
73- bool IsFilled () const override ;
74- TString GetError () const override ;
75-
76- private:
77- inline ui64 GetRowsLimit () const { return RowsLimit; }
78- inline ui64 GetBytesLimit () const { return MaxBytes; }
79-
80- bool Collect (const NTable::IScan::TRow& row, IOutputStream& out);
81- virtual TMaybe<TBuffer> Flush (bool prepare);
82-
83- static NBackup::IChecksum* CreateChecksum (const TMaybe<TS3ExportBufferSettings::TChecksumSettings>& settings);
84- static TZStdCompressionProcessor* CreateCompression (const TMaybe<TS3ExportBufferSettings::TCompressionSettings>& settings);
85-
86- private:
87- const TTagToColumn Columns;
88- const ui64 RowsLimit;
89- const ui64 MinBytes;
90- const ui64 MaxBytes;
91-
92- TTagToIndex Indices;
93-
94- protected:
95- ui64 Rows = 0 ;
96- ui64 BytesRead = 0 ;
97- TBuffer Buffer;
98-
99- NBackup::IChecksum::TPtr Checksum;
100- TZStdCompressionProcessor::TPtr Compression;
101-
102- TString ErrorString;
103- }; // TS3Buffer
104-
105- TS3Buffer::TS3Buffer (TS3ExportBufferSettings&& settings)
106- : Columns(std::move(settings.Columns))
107- , RowsLimit(settings.MaxRows)
108- , MinBytes(settings.MinBytes)
109- , MaxBytes(settings.MaxBytes)
110- , Checksum(CreateChecksum(settings.ChecksumSettings))
111- , Compression(CreateCompression(settings.CompressionSettings))
18+ TS3BufferRaw::TS3BufferRaw (const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums)
19+ : Columns(columns)
20+ , RowsLimit(rowsLimit)
21+ , BytesLimit(bytesLimit)
22+ , Rows(0 )
23+ , BytesRead(0 )
24+ , Checksum(enableChecksums ? NBackup::CreateChecksum() : nullptr )
11225{
11326}
11427
115- NBackup::IChecksum* TS3Buffer::CreateChecksum (const TMaybe<TS3ExportBufferSettings::TChecksumSettings>& settings) {
116- if (settings) {
117- switch (settings->ChecksumType ) {
118- case TS3ExportBufferSettings::TChecksumSettings::EChecksumType::Sha256:
119- return NBackup::CreateChecksum ();
120- }
121- }
122- return nullptr ;
123- }
124-
125- TZStdCompressionProcessor* TS3Buffer::CreateCompression (const TMaybe<TS3ExportBufferSettings::TCompressionSettings>& settings) {
126- if (settings) {
127- switch (settings->Algorithm ) {
128- case TS3ExportBufferSettings::TCompressionSettings::EAlgorithm::Zstd:
129- return new TZStdCompressionProcessor (*settings);
130- }
131- }
132- return nullptr ;
133- }
134-
135- void TS3Buffer::ColumnsOrder (const TVector<ui32>& tags) {
28+ void TS3BufferRaw::ColumnsOrder (const TVector<ui32>& tags) {
13629 Y_ABORT_UNLESS (tags.size () == Columns.size ());
13730
13831 Indices.clear ();
@@ -144,7 +37,7 @@ void TS3Buffer::ColumnsOrder(const TVector<ui32>& tags) {
14437 }
14538}
14639
147- bool TS3Buffer ::Collect (const NTable::IScan::TRow& row, IOutputStream& out) {
40+ bool TS3BufferRaw ::Collect (const NTable::IScan::TRow& row, IOutputStream& out) {
14841 bool needsComma = false ;
14942 for (const auto & [tag, column] : Columns) {
15043 auto it = Indices.find (tag);
@@ -259,7 +152,7 @@ bool TS3Buffer::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
259152 return true ;
260153}
261154
262- bool TS3Buffer ::Collect (const NTable::IScan::TRow& row) {
155+ bool TS3BufferRaw ::Collect (const NTable::IScan::TRow& row) {
263156 TBufferOutput out (Buffer);
264157 ErrorString.clear ();
265158
@@ -268,24 +161,14 @@ bool TS3Buffer::Collect(const NTable::IScan::TRow& row) {
268161 return false ;
269162 }
270163
271- TStringBuf data (Buffer.Data (), Buffer.Size ());
272- data = data.Tail (beforeSize);
273-
274- // Apply checksum
275164 if (Checksum) {
276- Checksum->AddData (data);
277- }
278-
279- // Compress
280- if (Compression && !Compression->AddData (data)) {
281- ErrorString = Compression->GetError ();
282- return false ;
165+ TStringBuf data (Buffer.Data (), Buffer.Size ());
166+ Checksum->AddData (data.Tail (beforeSize));
283167 }
284-
285168 return true ;
286169}
287170
288- IEventBase* TS3Buffer ::PrepareEvent (bool last, NExportScan::IBuffer::TStats& stats) {
171+ IEventBase* TS3BufferRaw ::PrepareEvent (bool last, NExportScan::IBuffer::TStats& stats) {
289172 stats.Rows = Rows;
290173 stats.BytesRead = BytesRead;
291174
@@ -303,108 +186,31 @@ IEventBase* TS3Buffer::PrepareEvent(bool last, NExportScan::IBuffer::TStats& sta
303186 }
304187}
305188
306- void TS3Buffer ::Clear () {
189+ void TS3BufferRaw ::Clear () {
307190 Y_ABORT_UNLESS (Flush (false ));
308191}
309192
310- bool TS3Buffer::IsFilled () const {
311- if (Buffer.Size () < MinBytes) {
312- return false ;
313- }
314-
193+ bool TS3BufferRaw::IsFilled () const {
315194 return Rows >= GetRowsLimit () || Buffer.Size () >= GetBytesLimit ();
316195}
317196
318- TString TS3Buffer ::GetError () const {
197+ TString TS3BufferRaw ::GetError () const {
319198 return ErrorString;
320199}
321200
322- TMaybe<TBuffer> TS3Buffer ::Flush (bool prepare ) {
201+ TMaybe<TBuffer> TS3BufferRaw ::Flush (bool ) {
323202 Rows = 0 ;
324203 BytesRead = 0 ;
325-
326- // Compression finishes compression frame during Flush
327- // so that last table row borders equal to compression frame borders.
328- // This full finished block must then be encrypted so that encryption frame
329- // has the same borders.
330- // It allows to import data in batches and save its state during import.
331-
332- if (Compression) {
333- TMaybe<TBuffer> compressedBuffer = Compression->Flush (prepare);
334- if (!compressedBuffer) {
335- return Nothing ();
336- }
337-
338- Buffer = std::move (*compressedBuffer);
339- }
340-
341204 return std::exchange (Buffer, TBuffer ());
342205}
343206
344- TZStdCompressionProcessor::TZStdCompressionProcessor (const TS3ExportBufferSettings::TCompressionSettings& settings)
345- : CompressionLevel(settings.CompressionLevel)
346- , Context(ZSTD_createCCtx())
207+ NExportScan::IBuffer* CreateS3ExportBufferRaw (
208+ const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums)
347209{
210+ return new TS3BufferRaw (columns, rowsLimit, bytesLimit, enableChecksums);
348211}
349212
350- bool TZStdCompressionProcessor::AddData (TStringBuf data) {
351- BytesAdded += data.size ();
352- auto input = ZSTD_inBuffer{data.data (), data.size (), 0 };
353- while (input.pos < input.size ) {
354- if (ERROR == Compress (&input, ZSTD_e_continue)) {
355- return false ;
356- }
357- }
358-
359- return true ;
360- }
361-
362- TMaybe<TBuffer> TZStdCompressionProcessor::Flush (bool prepare) {
363- if (prepare && BytesAdded) {
364- ECompressionResult res;
365- auto input = ZSTD_inBuffer{NULL , 0 , 0 };
366-
367- do {
368- if (res = Compress (&input, ZSTD_e_end); res == ERROR) {
369- return Nothing ();
370- }
371- } while (res != DONE);
372- }
373-
374- Reset ();
375- return std::exchange (Buffer, TBuffer ());
376- }
377-
378- TZStdCompressionProcessor::ECompressionResult TZStdCompressionProcessor::Compress (ZSTD_inBuffer* input, ZSTD_EndDirective endOp) {
379- auto output = ZSTD_outBuffer{Buffer.Data (), Buffer.Capacity (), Buffer.Size ()};
380- auto res = ZSTD_compressStream2 (Context.Get (), &output, input, endOp);
381-
382- if (ZSTD_isError (res)) {
383- ErrorCode = res;
384- return ERROR;
385- }
386-
387- if (res > 0 ) {
388- Buffer.Reserve (output.pos + res);
389- }
390-
391- Buffer.Proceed (output.pos );
392- return res ? CONTINUE : DONE;
393- }
394-
395- void TZStdCompressionProcessor::Reset () {
396- BytesAdded = 0 ;
397- ZSTD_CCtx_reset (Context.Get (), ZSTD_reset_session_only);
398- ZSTD_CCtx_refCDict (Context.Get (), NULL );
399- ZSTD_CCtx_setParameter (Context.Get (), ZSTD_c_compressionLevel, CompressionLevel);
400- }
401-
402- } // anonymous
403-
404- NExportScan::IBuffer* CreateS3ExportBuffer (TS3ExportBufferSettings&& settings) {
405- return new TS3Buffer (std::move (settings));
406- }
407-
408- } // namespace NKikimr::NDataShard
213+ } // NDataShard
214+ } // NKikimr
409215
410216#endif // KIKIMR_DISABLE_S3_OPS
0 commit comments