Skip to content

Commit

Permalink
ClickHouse#14 stub for block's compression
Browse files Browse the repository at this point in the history
  • Loading branch information
artpaul committed Apr 25, 2017
1 parent e31ed05 commit 4287a74
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 45 deletions.
5 changes: 2 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
CMAKE_MINIMUM_REQUIRED(VERSION 3.0.2)

INCLUDE (cmake/subdirs.cmake)
INCLUDE (cmake/cpp11.cmake)
INCLUDE (cmake/subdirs.cmake)

PROJECT (CLICKHOUSE-CLIENT)

Expand All @@ -17,8 +17,7 @@ PROJECT (CLICKHOUSE-CLIENT)
ENDIF ()

INCLUDE_DIRECTORIES(.)
INCLUDE_DIRECTORIES(contrib/cityhash)
INCLUDE_DIRECTORIES(contrib/lz4)
INCLUDE_DIRECTORIES(contrib)

SUBDIRS (
clickhouse
Expand Down
116 changes: 77 additions & 39 deletions clickhouse/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
#include "base/socket.h"
#include "columns/factory.h"

#include <cityhash/city.h>
#include <lz4/lz4.h>

#include <assert.h>
#include <atomic>
#include <system_error>
#include <vector>
Expand Down Expand Up @@ -71,6 +75,8 @@ class Client::Impl {

bool SendHello();

bool ReadBlock(Block* block, CodedInputStream* input);

bool ReceiveHello();

/// Reads data packet form input stream.
Expand All @@ -79,6 +85,8 @@ class Client::Impl {
/// Reads exception packet form input stream.
bool ReceiveException(bool rethrow = false);

void WriteBlock(const Block& block, CodedOutputStream* output);

private:
void Disconnect() {
socket_.Close();
Expand Down Expand Up @@ -110,6 +118,7 @@ class Client::Impl {
const ClientOptions options_;
QueryEvents* events_;
uint64_t query_id_;
int compression_ = CompressionState::Disable;

SocketHolder socket_;

Expand Down Expand Up @@ -145,9 +154,14 @@ Client::Impl::Impl(const ClientOptions& opts)
if (socket_.Closed()) {
throw std::system_error(errno, std::system_category());
}

if (!Handshake()) {
throw std::runtime_error("fail to connect to " + options_.host);
}

if (options_.compression_method != CompressionMethod::None) {
compression_ = CompressionState::Enable;
}
}

Client::Impl::~Impl() {
Expand Down Expand Up @@ -309,33 +323,26 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
return false;
}

bool Client::Impl::ReceiveData() {
if (REVISION >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
std::string table_name;

if (!WireFormat::ReadString(&input_, &table_name)) {
return false;
}
}
bool Client::Impl::ReadBlock(Block* block, CodedInputStream* input) {
// Additional information about block.
if (REVISION >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
uint64_t num;
BlockInfo info;

// BlockInfo
if (!WireFormat::ReadUInt64(&input_, &num)) {
if (!WireFormat::ReadUInt64(input, &num)) {
return false;
}
if (!WireFormat::ReadFixed(&input_, &info.is_overflows)) {
if (!WireFormat::ReadFixed(input, &info.is_overflows)) {
return false;
}
if (!WireFormat::ReadUInt64(&input_, &num)) {
if (!WireFormat::ReadUInt64(input, &num)) {
return false;
}
if (!WireFormat::ReadFixed(&input_, &info.bucket_num)) {
if (!WireFormat::ReadFixed(input, &info.bucket_num)) {
return false;
}
if (!WireFormat::ReadUInt64(&input_, &num)) {
if (!WireFormat::ReadUInt64(input, &num)) {
return false;
}

Expand All @@ -345,37 +352,58 @@ bool Client::Impl::ReceiveData() {
uint64_t num_columns = 0;
uint64_t num_rows = 0;

if (!WireFormat::ReadUInt64(&input_, &num_columns)) {
if (!WireFormat::ReadUInt64(input, &num_columns)) {
return false;
}
if (!WireFormat::ReadUInt64(&input_, &num_rows)) {
if (!WireFormat::ReadUInt64(input, &num_rows)) {
return false;
}

Block block(num_columns, num_rows);

for (size_t i = 0; i < num_columns; ++i) {
std::string name;
std::string type;

if (!WireFormat::ReadString(&input_, &name)) {
if (!WireFormat::ReadString(input, &name)) {
return false;
}
if (!WireFormat::ReadString(&input_, &type)) {
if (!WireFormat::ReadString(input, &type)) {
return false;
}

if (ColumnRef col = CreateColumnByType(type)) {
if (num_rows && !col->Load(&input_, num_rows)) {
if (num_rows && !col->Load(input, num_rows)) {
throw std::runtime_error("can't load");
}

block.AppendColumn(name, col);
block->AppendColumn(name, col);
} else {
throw std::runtime_error(std::string("unsupported column type: ") + type);
}
}

return true;
}

bool Client::Impl::ReceiveData() {
Block block;

if (REVISION >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
std::string table_name;

if (!WireFormat::ReadString(&input_, &table_name)) {
return false;
}
}

if (compression_ == CompressionState::Enable) {
// TODO
assert(false);
} else {
if (!ReadBlock(&block, &input_)) {
return false;
}
}

if (events_) {
events_->OnData(block);
}
Expand Down Expand Up @@ -464,7 +492,7 @@ void Client::Impl::SendQuery(const std::string& query) {
WireFormat::WriteString(&output_, std::string());

WireFormat::WriteUInt64(&output_, Stages::Complete);
WireFormat::WriteUInt64(&output_, CompressionState::Disable);
WireFormat::WriteUInt64(&output_, compression_);
WireFormat::WriteString(&output_, query);
// Send empty block as marker of
// end of data
Expand All @@ -473,30 +501,40 @@ void Client::Impl::SendQuery(const std::string& query) {
output_.Flush();
}

void Client::Impl::SendData(const Block& block) {
WireFormat::WriteUInt64(&output_, ClientCodes::Data);

if (server_info_.revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
WireFormat::WriteString(&output_, std::string());
}

/// Дополнительная информация о блоке.
void Client::Impl::WriteBlock(const Block& block, CodedOutputStream* output) {
// Additional information about block.
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
WireFormat::WriteUInt64(&output_, 1);
WireFormat::WriteFixed (&output_, block.Info().is_overflows);
WireFormat::WriteUInt64(&output_, 2);
WireFormat::WriteFixed (&output_, block.Info().bucket_num);
WireFormat::WriteUInt64(&output_, 0);
WireFormat::WriteUInt64(output, 1);
WireFormat::WriteFixed (output, block.Info().is_overflows);
WireFormat::WriteUInt64(output, 2);
WireFormat::WriteFixed (output, block.Info().bucket_num);
WireFormat::WriteUInt64(output, 0);
}

WireFormat::WriteUInt64(&output_, block.GetColumnCount());
WireFormat::WriteUInt64(&output_, block.GetRowCount());
WireFormat::WriteUInt64(output, block.GetColumnCount());
WireFormat::WriteUInt64(output, block.GetRowCount());

for (Block::Iterator bi(block); bi.IsValid(); bi.Next()) {
WireFormat::WriteString(&output_, bi.Name());
WireFormat::WriteString(&output_, bi.Type()->GetName());
WireFormat::WriteString(output, bi.Name());
WireFormat::WriteString(output, bi.Type()->GetName());

bi.Column()->Save(output);
}
}

void Client::Impl::SendData(const Block& block) {
WireFormat::WriteUInt64(&output_, ClientCodes::Data);

if (server_info_.revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
WireFormat::WriteString(&output_, std::string());
}

bi.Column()->Save(&output_);
if (compression_ == CompressionState::Enable) {
// TODO
assert(false);
} else {
WriteBlock(block, &output_);
}

output_.Flush();
Expand Down
9 changes: 9 additions & 0 deletions clickhouse/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@

namespace clickhouse {

/// Methods of block compression.
enum class CompressionMethod {
None = -1,
LZ4 = 1,
};

struct ClientOptions {
#define DECLARE_FIELD(name, type, setter, default) \
type name = default; \
Expand All @@ -40,6 +46,9 @@ struct ClientOptions {
/// enable throwing exceptions with standard c++ exception mechanism.
DECLARE_FIELD(rethrow_exceptions, bool, SetRethrowException, true);

/// Compression method.
DECLARE_FIELD(compression_method, CompressionMethod, SetCompressionMethod, CompressionMethod::None);

#undef DECLARE_FIELD
};

Expand Down
2 changes: 1 addition & 1 deletion contrib/cityhash/city.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
// possible hash functions, by using SIMD instructions, or by
// compromising on hash quality.

#include <city.h>
#include "city.h"

#include <algorithm>
#include <string.h> // for memcpy and memset
Expand Down
2 changes: 1 addition & 1 deletion contrib/lz4/lz4.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
/**************************************
* Includes
**************************************/
#include <lz4.h>
#include "lz4.h"


/**************************************
Expand Down
2 changes: 1 addition & 1 deletion contrib/lz4/lz4hc.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ static const int LZ4HC_compressionLevel_default = 9;
/**************************************
* Includes
**************************************/
#include <lz4hc.h>
#include "lz4hc.h"


/**************************************
Expand Down

0 comments on commit 4287a74

Please sign in to comment.