Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 3 additions & 16 deletions ydb/core/tx/datashard/export_s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,15 @@ class TS3Export: public IExport {
const ui64 maxBytes = scanSettings.GetBytesBatchSize();
const ui64 minBytes = Task.GetS3Settings().GetLimits().GetMinWriteBatchSize();

TS3ExportBufferSettings bufferSettings;
bufferSettings
.WithColumns(Columns)
.WithMaxRows(maxRows)
.WithMaxBytes(maxBytes);
if (Task.GetEnableChecksums()) {
bufferSettings.WithChecksum(TS3ExportBufferSettings::Sha256Checksum());
}

switch (CodecFromTask(Task)) {
case ECompressionCodec::None:
break;
return CreateS3ExportBufferRaw(Columns, maxRows, maxBytes, Task.GetEnableChecksums());
case ECompressionCodec::Zstd:
bufferSettings
.WithMinBytes(minBytes)
.WithCompression(TS3ExportBufferSettings::ZstdCompression(Task.GetCompression().GetLevel()));
break;
return CreateS3ExportBufferZstd(Task.GetCompression().GetLevel(), Columns, maxRows,
maxBytes, minBytes, Task.GetEnableChecksums());
case ECompressionCodec::Invalid:
Y_ABORT("unreachable");
}

return CreateS3ExportBuffer(std::move(bufferSettings));
}

void Shutdown() const override {}
Expand Down
244 changes: 25 additions & 219 deletions ydb/core/tx/datashard/export_s3_buffer.cpp
Original file line number Diff line number Diff line change
@@ -1,138 +1,31 @@
#ifndef KIKIMR_DISABLE_S3_OPS

#include "export_s3_buffer.h"
#include "export_s3_buffer_raw.h"
#include "type_serialization.h"

#include <ydb/core/backup/common/checksum.h>
#include <ydb/core/tablet_flat/flat_row_state.h>
#include <yql/essentials/types/binary_json/read.h>
#include <ydb/public/lib/scheme_types/scheme_type_id.h>

#include <library/cpp/string_utils/quote/quote.h>

#include <util/datetime/base.h>
#include <util/generic/buffer.h>
#include <util/stream/buffer.h>

#include <contrib/libs/zstd/include/zstd.h>
namespace NKikimr {
namespace NDataShard {


namespace NKikimr::NDataShard {

namespace {

struct DestroyZCtx {
static void Destroy(::ZSTD_CCtx* p) noexcept {
ZSTD_freeCCtx(p);
}
};

class TZStdCompressionProcessor {
public:
using TPtr = THolder<TZStdCompressionProcessor>;

explicit TZStdCompressionProcessor(const TS3ExportBufferSettings::TCompressionSettings& settings);

TString GetError() const {
return ZSTD_getErrorName(ErrorCode);
}

bool AddData(TStringBuf data);

TMaybe<TBuffer> Flush(bool prepare);

private:
enum ECompressionResult {
CONTINUE,
DONE,
ERROR,
};

ECompressionResult Compress(ZSTD_inBuffer* input, ZSTD_EndDirective endOp);
void Reset();

private:
const int CompressionLevel;
THolder<::ZSTD_CCtx, DestroyZCtx> Context;
size_t ErrorCode = 0;
TBuffer Buffer;
ui64 BytesAdded = 0;
};

class TS3Buffer: public NExportScan::IBuffer {
using TTagToColumn = IExport::TTableColumns;
using TTagToIndex = THashMap<ui32, ui32>; // index in IScan::TRow

public:
explicit TS3Buffer(TS3ExportBufferSettings&& settings);

void ColumnsOrder(const TVector<ui32>& tags) override;
bool Collect(const NTable::IScan::TRow& row) override;
IEventBase* PrepareEvent(bool last, NExportScan::IBuffer::TStats& stats) override;
void Clear() override;
bool IsFilled() const override;
TString GetError() const override;

private:
inline ui64 GetRowsLimit() const { return RowsLimit; }
inline ui64 GetBytesLimit() const { return MaxBytes; }

bool Collect(const NTable::IScan::TRow& row, IOutputStream& out);
virtual TMaybe<TBuffer> Flush(bool prepare);

static NBackup::IChecksum* CreateChecksum(const TMaybe<TS3ExportBufferSettings::TChecksumSettings>& settings);
static TZStdCompressionProcessor* CreateCompression(const TMaybe<TS3ExportBufferSettings::TCompressionSettings>& settings);

private:
const TTagToColumn Columns;
const ui64 RowsLimit;
const ui64 MinBytes;
const ui64 MaxBytes;

TTagToIndex Indices;

protected:
ui64 Rows = 0;
ui64 BytesRead = 0;
TBuffer Buffer;

NBackup::IChecksum::TPtr Checksum;
TZStdCompressionProcessor::TPtr Compression;

TString ErrorString;
}; // TS3Buffer

TS3Buffer::TS3Buffer(TS3ExportBufferSettings&& settings)
: Columns(std::move(settings.Columns))
, RowsLimit(settings.MaxRows)
, MinBytes(settings.MinBytes)
, MaxBytes(settings.MaxBytes)
, Checksum(CreateChecksum(settings.ChecksumSettings))
, Compression(CreateCompression(settings.CompressionSettings))
TS3BufferRaw::TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums)
: Columns(columns)
, RowsLimit(rowsLimit)
, BytesLimit(bytesLimit)
, Rows(0)
, BytesRead(0)
, Checksum(enableChecksums ? NBackup::CreateChecksum() : nullptr)
{
}

NBackup::IChecksum* TS3Buffer::CreateChecksum(const TMaybe<TS3ExportBufferSettings::TChecksumSettings>& settings) {
if (settings) {
switch (settings->ChecksumType) {
case TS3ExportBufferSettings::TChecksumSettings::EChecksumType::Sha256:
return NBackup::CreateChecksum();
}
}
return nullptr;
}

TZStdCompressionProcessor* TS3Buffer::CreateCompression(const TMaybe<TS3ExportBufferSettings::TCompressionSettings>& settings) {
if (settings) {
switch (settings->Algorithm) {
case TS3ExportBufferSettings::TCompressionSettings::EAlgorithm::Zstd:
return new TZStdCompressionProcessor(*settings);
}
}
return nullptr;
}

void TS3Buffer::ColumnsOrder(const TVector<ui32>& tags) {
void TS3BufferRaw::ColumnsOrder(const TVector<ui32>& tags) {
Y_ABORT_UNLESS(tags.size() == Columns.size());

Indices.clear();
Expand All @@ -144,7 +37,7 @@ void TS3Buffer::ColumnsOrder(const TVector<ui32>& tags) {
}
}

bool TS3Buffer::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
bool needsComma = false;
for (const auto& [tag, column] : Columns) {
auto it = Indices.find(tag);
Expand Down Expand Up @@ -259,7 +152,7 @@ bool TS3Buffer::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
return true;
}

bool TS3Buffer::Collect(const NTable::IScan::TRow& row) {
bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row) {
TBufferOutput out(Buffer);
ErrorString.clear();

Expand All @@ -268,24 +161,14 @@ bool TS3Buffer::Collect(const NTable::IScan::TRow& row) {
return false;
}

TStringBuf data(Buffer.Data(), Buffer.Size());
data = data.Tail(beforeSize);

// Apply checksum
if (Checksum) {
Checksum->AddData(data);
}

// Compress
if (Compression && !Compression->AddData(data)) {
ErrorString = Compression->GetError();
return false;
TStringBuf data(Buffer.Data(), Buffer.Size());
Checksum->AddData(data.Tail(beforeSize));
}

return true;
}

IEventBase* TS3Buffer::PrepareEvent(bool last, NExportScan::IBuffer::TStats& stats) {
IEventBase* TS3BufferRaw::PrepareEvent(bool last, NExportScan::IBuffer::TStats& stats) {
stats.Rows = Rows;
stats.BytesRead = BytesRead;

Expand All @@ -303,108 +186,31 @@ IEventBase* TS3Buffer::PrepareEvent(bool last, NExportScan::IBuffer::TStats& sta
}
}

void TS3Buffer::Clear() {
void TS3BufferRaw::Clear() {
Y_ABORT_UNLESS(Flush(false));
}

bool TS3Buffer::IsFilled() const {
if (Buffer.Size() < MinBytes) {
return false;
}

bool TS3BufferRaw::IsFilled() const {
return Rows >= GetRowsLimit() || Buffer.Size() >= GetBytesLimit();
}

TString TS3Buffer::GetError() const {
TString TS3BufferRaw::GetError() const {
return ErrorString;
}

TMaybe<TBuffer> TS3Buffer::Flush(bool prepare) {
TMaybe<TBuffer> TS3BufferRaw::Flush(bool) {
Rows = 0;
BytesRead = 0;

// Compression finishes compression frame during Flush
// so that last table row borders equal to compression frame borders.
// This full finished block must then be encrypted so that encryption frame
// has the same borders.
// It allows to import data in batches and save its state during import.

if (Compression) {
TMaybe<TBuffer> compressedBuffer = Compression->Flush(prepare);
if (!compressedBuffer) {
return Nothing();
}

Buffer = std::move(*compressedBuffer);
}

return std::exchange(Buffer, TBuffer());
}

TZStdCompressionProcessor::TZStdCompressionProcessor(const TS3ExportBufferSettings::TCompressionSettings& settings)
: CompressionLevel(settings.CompressionLevel)
, Context(ZSTD_createCCtx())
NExportScan::IBuffer* CreateS3ExportBufferRaw(
const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums)
{
return new TS3BufferRaw(columns, rowsLimit, bytesLimit, enableChecksums);
}

bool TZStdCompressionProcessor::AddData(TStringBuf data) {
BytesAdded += data.size();
auto input = ZSTD_inBuffer{data.data(), data.size(), 0};
while (input.pos < input.size) {
if (ERROR == Compress(&input, ZSTD_e_continue)) {
return false;
}
}

return true;
}

TMaybe<TBuffer> TZStdCompressionProcessor::Flush(bool prepare) {
if (prepare && BytesAdded) {
ECompressionResult res;
auto input = ZSTD_inBuffer{NULL, 0, 0};

do {
if (res = Compress(&input, ZSTD_e_end); res == ERROR) {
return Nothing();
}
} while (res != DONE);
}

Reset();
return std::exchange(Buffer, TBuffer());
}

TZStdCompressionProcessor::ECompressionResult TZStdCompressionProcessor::Compress(ZSTD_inBuffer* input, ZSTD_EndDirective endOp) {
auto output = ZSTD_outBuffer{Buffer.Data(), Buffer.Capacity(), Buffer.Size()};
auto res = ZSTD_compressStream2(Context.Get(), &output, input, endOp);

if (ZSTD_isError(res)) {
ErrorCode = res;
return ERROR;
}

if (res > 0) {
Buffer.Reserve(output.pos + res);
}

Buffer.Proceed(output.pos);
return res ? CONTINUE : DONE;
}

void TZStdCompressionProcessor::Reset() {
BytesAdded = 0;
ZSTD_CCtx_reset(Context.Get(), ZSTD_reset_session_only);
ZSTD_CCtx_refCDict(Context.Get(), NULL);
ZSTD_CCtx_setParameter(Context.Get(), ZSTD_c_compressionLevel, CompressionLevel);
}

} // anonymous

NExportScan::IBuffer* CreateS3ExportBuffer(TS3ExportBufferSettings&& settings) {
return new TS3Buffer(std::move(settings));
}

} // namespace NKikimr::NDataShard
} // NDataShard
} // NKikimr

#endif // KIKIMR_DISABLE_S3_OPS
Loading
Loading