Skip to content

Commit 674a392

Browse files
author
Nong Li
committed
Read a dictionary encoded int column.
1 parent f979b15 commit 674a392

File tree

4 files changed

+57
-21
lines changed

4 files changed

+57
-21
lines changed

example/compute_stats.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,11 @@ int main(int argc, char** argv) {
4949
}
5050
InMemoryInputStream input(&column_buffer[0], column_buffer.size());
5151
ColumnReader reader(&metadata.schema[c + 1], &input);
52-
printf("%d\n", reader.HasNext());
52+
while (reader.HasNext()) {
53+
int32_t val;
54+
bool is_null = reader.GetInt32(&val);
55+
if (!is_null) cout << val << endl;
56+
}
5357
}
5458
}
5559
fclose(file);

src/encodings.h

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class Decoder {
1313
public:
1414
virtual ~Decoder() {}
1515

16-
virtual void SetData(const uint8_t* data, int len) = 0;
16+
virtual void SetData(int num_values, const uint8_t* data, int len) = 0;
1717

1818
// Subclasses should override the ones they support
1919
virtual bool GetBool() { return false; }
@@ -25,17 +25,17 @@ class Decoder {
2525
int value_left() const { return num_values_; }
2626

2727
protected:
28-
Decoder(parquet::SchemaElement* schema, int num_values)
29-
: schema_(schema), num_values_(num_values) {}
30-
parquet::SchemaElement* schema_;
28+
Decoder(const parquet::SchemaElement* schema) : schema_(schema), num_values_(0) {}
29+
const parquet::SchemaElement* schema_;
3130
int num_values_;
3231
};
3332

3433
class BoolDecoder : public Decoder {
3534
public:
36-
BoolDecoder(parquet::SchemaElement* schema, int num_values) : Decoder(schema, num_values) { }
35+
BoolDecoder(const parquet::SchemaElement* schema) : Decoder(schema) { }
3736

38-
virtual void SetData(const uint8_t* data, int len) {
37+
virtual void SetData(int num_values, const uint8_t* data, int len) {
38+
num_values_ = num_values;
3939
decoder_ = impala::RleDecoder(data, len, 1);
4040
}
4141

@@ -52,11 +52,12 @@ class BoolDecoder : public Decoder {
5252

5353
class PlainDecoder : public Decoder {
5454
public:
55-
PlainDecoder(parquet::SchemaElement* schema, int num_values)
56-
: Decoder(schema, num_values), data_(NULL), len_(0) {
55+
PlainDecoder(const parquet::SchemaElement* schema)
56+
: Decoder(schema), data_(NULL), len_(0) {
5757
}
5858

59-
virtual void SetData(const uint8_t* data, int len) {
59+
virtual void SetData(int num_values, const uint8_t* data, int len) {
60+
num_values_ = num_values;
6061
data_ = data;
6162
len_ = len;
6263
}
@@ -109,8 +110,8 @@ class PlainDecoder : public Decoder {
109110

110111
class DictionaryDecoder : public Decoder {
111112
public:
112-
DictionaryDecoder(parquet::SchemaElement* schema, int num_values, Decoder* dictionary)
113-
: Decoder(schema, num_values) {
113+
DictionaryDecoder(const parquet::SchemaElement* schema, Decoder* dictionary)
114+
: Decoder(schema) {
114115
int num_dictionary_values = dictionary->value_left();
115116
switch (schema->type) {
116117
case parquet::Type::BOOLEAN: throw "Boolean cols should not be dictionary encoded.";
@@ -143,7 +144,8 @@ class DictionaryDecoder : public Decoder {
143144
}
144145
}
145146

146-
virtual void SetData(const uint8_t* data, int len) {
147+
virtual void SetData(int num_values, const uint8_t* data, int len) {
148+
num_values_ = num_values;
147149
if (len == 0) return;
148150
uint8_t bit_width = *data;
149151
++data;

src/parquet.cc

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,37 +34,62 @@ ColumnReader::ColumnReader(const SchemaElement* schema, InputStream* stream)
3434
buffered_bytes_offset_ = 0;
3535
}
3636

37+
bool ColumnReader::GetInt32(int32_t* result) {
38+
--num_buffered_values_;
39+
int def_level;
40+
if (!definition_level_decoder_.Get(&def_level)) throw "EOF";
41+
if (def_level == 0) return true;
42+
*result = decoder_->GetInt32();
43+
return false;
44+
}
45+
3746
bool ColumnReader::ReadNewPage() {
3847
uint32_t bytes_read = stream_->Read(&buffered_bytes_[0], buffered_bytes_.size());
48+
if (bytes_read == 0) return false;
3949
uint32_t header_size = bytes_read;
4050
if (!DeserializeThriftMsg(&buffered_bytes_[0], &header_size, &current_page_header_)) {
4151
return false;
4252
}
4353
buffered_bytes_offset_ += header_size;
4454
// TODO: handle decompression.
55+
cout << apache::thrift::ThriftDebugString(current_page_header_) << endl;
4556

4657
if (current_page_header_.type == PageType::DICTIONARY_PAGE) {
4758
InitDictionary();
48-
// return ReadNewPage();
59+
header_size = bytes_read - buffered_bytes_offset_;
60+
if (!DeserializeThriftMsg(&buffered_bytes_[buffered_bytes_offset_],
61+
&header_size, &current_page_header_)) {
62+
return false;
63+
}
64+
cout << apache::thrift::ThriftDebugString(current_page_header_) << endl;
65+
buffered_bytes_offset_ += header_size;
4966
}
50-
67+
int num_definition_bytes = *reinterpret_cast<uint32_t*>(&buffered_bytes_[buffered_bytes_offset_]);
68+
buffered_bytes_offset_ += 4;
69+
definition_level_decoder_ =
70+
impala::RleDecoder(&buffered_bytes_[buffered_bytes_offset_], num_definition_bytes, 1);
71+
buffered_bytes_offset_ += num_definition_bytes;
72+
num_buffered_values_ = current_page_header_.data_page_header.num_values;
73+
decoder_->SetData(num_buffered_values_,
74+
&buffered_bytes_[buffered_bytes_offset_], bytes_read - buffered_bytes_offset_);
5175
return true;
5276
}
5377

5478
void ColumnReader::InitDictionary() {
55-
for (int i = 0; i < current_page_header_.dictionary_page_header.num_values; ++i) {
56-
uint8_t* data = &buffered_bytes_[buffered_bytes_offset_];
57-
cout << *reinterpret_cast<int32_t*>(data) << endl;
58-
buffered_bytes_offset_ += sizeof(int32_t);
59-
}
79+
uint8_t* data = &buffered_bytes_[buffered_bytes_offset_];
80+
int len = current_page_header_.uncompressed_page_size;
81+
buffered_bytes_offset_ += len;
82+
PlainDecoder dictionary(schema_);
83+
dictionary.SetData(current_page_header_.dictionary_page_header.num_values, data, len);
84+
decoder_.reset(new DictionaryDecoder(schema_, &dictionary));
6085
}
6186

6287
bool ColumnReader::HasNext() {
6388
if (num_buffered_values_ == 0) {
6489
ReadNewPage();
6590
if (num_buffered_values_ == 0) return false;
6691
}
67-
return false;
92+
return true;
6893
}
6994

7095
}

src/parquet/parquet.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
#include "gen-cpp/parquet_constants.h"
66
#include "gen-cpp/parquet_types.h"
77

8+
#include "impala/rle-encoding.h"
9+
810
// TCompactProtocol requires some #defines to work right.
911
#define SIGNED_RIGHT_SHIFT_IS 1
1012
#define ARITHMETIC_RIGHT_SHIFT 1
@@ -49,6 +51,8 @@ class ColumnReader {
4951
ColumnReader(const parquet::SchemaElement* schema, InputStream* stream);
5052
bool HasNext();
5153

54+
bool GetInt32(int32_t* result);
55+
5256
private:
5357
bool ReadNewPage();
5458
void InitDictionary();
@@ -61,6 +65,7 @@ class ColumnReader {
6165
size_t num_buffered_bytes_;
6266
size_t buffered_bytes_offset_;
6367

68+
impala::RleDecoder definition_level_decoder_;
6469
boost::shared_ptr<Decoder> decoder_;
6570
};
6671

0 commit comments

Comments
 (0)