Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions clickhouse/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ const BlockInfo& Block::Info() const {
return info_;
}

/// Set block info
void Block::SetInfo(BlockInfo info) {
info_ = std::move(info);
}

/// Count of rows in the block.
size_t Block::GetRowCount() const {
return rows_;
Expand Down
3 changes: 3 additions & 0 deletions clickhouse/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class Block {

const BlockInfo& Info() const;

/// Set block info
void SetInfo(BlockInfo info);

/// Count of rows in the block.
size_t GetRowCount() const;

Expand Down
14 changes: 12 additions & 2 deletions clickhouse/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@
#define DBMS_MIN_REVISION_WITH_VERSION_PATCH 54401
#define DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE 54405
#define DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA 54410
#define DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO 54420

#define REVISION DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA
#define REVISION DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO

namespace clickhouse {

Expand Down Expand Up @@ -408,6 +409,15 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
return false;
}
}
if (REVISION >= DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO)
{
if (!WireFormat::ReadUInt64(*input_, &info.written_rows)) {
return false;
}
if (!WireFormat::ReadUInt64(*input_, &info.written_bytes)) {
return false;
}
}

if (events_) {
events_->OnProgress(info);
Expand Down Expand Up @@ -475,7 +485,7 @@ bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
return false;
}

// TODO use data
block->SetInfo(std::move(info));
}

uint64_t num_columns = 0;
Expand Down
2 changes: 2 additions & 0 deletions clickhouse/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ struct Progress {
uint64_t rows = 0;
uint64_t bytes = 0;
uint64_t total_rows = 0;
uint64_t written_rows = 0;
uint64_t written_bytes = 0;
};


Expand Down
29 changes: 29 additions & 0 deletions ut/client_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,35 @@ TEST_P(ClientCase, RoundtripArrayTString) {
EXPECT_TRUE(CompareRecursive(*array, *result_typed));
}

TEST_P(ClientCase, OnProgress) {
Block block;
createTableWithOneColumn<ColumnString>(block);

std::optional<Progress> received_progress;
Query query("INSERT INTO " + table_name + " (*) VALUES (\'Foo\'), (\'Bar\')" );
query.OnProgress([&](const Progress& progress) {
received_progress = progress;
});
client_->Execute(query);

ASSERT_TRUE(received_progress.has_value());

EXPECT_GE(received_progress->rows, 0u);
EXPECT_LE(received_progress->rows, 2u);

EXPECT_GE(received_progress->bytes, 0u);
EXPECT_LE(received_progress->bytes, 10000u);

EXPECT_GE(received_progress->total_rows, 0u);
EXPECT_LE(received_progress->total_rows, 2u);

EXPECT_GE(received_progress->written_rows, 0u);
EXPECT_LE(received_progress->written_rows, 2u);

EXPECT_GE(received_progress->written_bytes, 0u);
EXPECT_LE(received_progress->written_bytes, 10000u);
}

const auto LocalHostEndpoint = ClientOptions()
.SetHost( getEnvOrDefault("CLICKHOUSE_HOST", "localhost"))
.SetPort( getEnvOrDefault<size_t>("CLICKHOUSE_PORT", "9000"))
Expand Down