Skip to content

Commit 060c0d7

Browse files
author
Nong Li
committed
Better decoder management.
1 parent 9f1d702 commit 060c0d7

File tree

4 files changed

+136
-39
lines changed

4 files changed

+136
-39
lines changed

example/compute_stats.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ int main(int argc, char** argv) {
5050
InMemoryInputStream input(&column_buffer[0], column_buffer.size());
5151
ColumnReader reader(&metadata.schema[c + 1], &input);
5252
while (reader.HasNext()) {
53-
int32_t val;
54-
bool is_null = reader.GetInt32(&val);
55-
if (!is_null) cout << val << endl;
53+
int def_level, rep_level;
54+
int32_t val = reader.GetInt32(&def_level, &rep_level);;
55+
if (def_level >= rep_level) cout << val << endl;
5656
}
5757
}
5858
}

src/encodings.h

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,46 @@ class Decoder {
1313
public:
1414
virtual ~Decoder() {}
1515

16+
// Sets the data for a new page. This will be called multiple times on the same
17+
// decoder and should reset all internal state.
1618
virtual void SetData(int num_values, const uint8_t* data, int len) = 0;
1719

1820
// Subclasses should override the ones they support
19-
virtual bool GetBool() { return false; }
20-
virtual int32_t GetInt32() { return 0; }
21-
virtual int64_t GetInt64() { return 0; }
22-
virtual float GetFloat() { return 0; }
23-
virtual String GetString() { return String(); }
21+
virtual bool GetBool() {
22+
throw ParquetException("Decoder does not implement this type.");
23+
}
24+
virtual int32_t GetInt32() {
25+
throw ParquetException("Decoder does not implement this type.");
26+
}
27+
virtual int64_t GetInt64() {
28+
throw ParquetException("Decoder does not implement this type.");
29+
}
30+
virtual float GetFloat() {
31+
throw ParquetException("Decoder does not implement this type.");
32+
}
33+
virtual String GetString() {
34+
throw ParquetException("Decoder does not implement this type.");
35+
}
2436

25-
int value_left() const { return num_values_; }
37+
// Returns the number of values left (for the last call to SetData()). This is
38+
// the number of values left in this page.
39+
int values_left() const { return num_values_; }
40+
41+
const parquet::Encoding::type encoding() const { return encoding_; }
2642

2743
protected:
28-
Decoder(const parquet::SchemaElement* schema) : schema_(schema), num_values_(0) {}
44+
Decoder(const parquet::SchemaElement* schema, const parquet::Encoding::type& encoding)
45+
: schema_(schema), encoding_(encoding), num_values_(0) {}
46+
2947
const parquet::SchemaElement* schema_;
48+
const parquet::Encoding::type encoding_;
3049
int num_values_;
3150
};
3251

3352
class BoolDecoder : public Decoder {
3453
public:
35-
BoolDecoder(const parquet::SchemaElement* schema) : Decoder(schema) { }
54+
BoolDecoder(const parquet::SchemaElement* schema)
55+
: Decoder(schema, parquet::Encoding::PLAIN) { }
3656

3757
virtual void SetData(int num_values, const uint8_t* data, int len) {
3858
num_values_ = num_values;
@@ -41,7 +61,7 @@ class BoolDecoder : public Decoder {
4161

4262
virtual bool GetBool() {
4363
bool result;
44-
if (!decoder_.Get(&result)) throw "EOF";
64+
if (!decoder_.Get(&result)) ParquetException::EofException();
4565
--num_values_;
4666
return result;
4767
}
@@ -53,7 +73,7 @@ class BoolDecoder : public Decoder {
5373
class PlainDecoder : public Decoder {
5474
public:
5575
PlainDecoder(const parquet::SchemaElement* schema)
56-
: Decoder(schema), data_(NULL), len_(0) {
76+
: Decoder(schema, parquet::Encoding::PLAIN), data_(NULL), len_(0) {
5777
}
5878

5979
virtual void SetData(int num_values, const uint8_t* data, int len) {
@@ -63,7 +83,7 @@ class PlainDecoder : public Decoder {
6383
}
6484

6585
virtual int32_t GetInt32() {
66-
if (len_ < sizeof(int32_t)) throw "EOF";
86+
if (len_ < sizeof(int32_t)) ParquetException::EofException();
6787
int32_t val = *reinterpret_cast<const int32_t*>(data_);
6888
data_ += sizeof(int32_t);
6989
len_ -= sizeof(int32_t);
@@ -72,7 +92,7 @@ class PlainDecoder : public Decoder {
7292
}
7393

7494
virtual int64_t GetInt64() {
75-
if (len_ < sizeof(int64_t)) throw "EOF";
95+
if (len_ < sizeof(int64_t)) ParquetException::EofException();
7696
int64_t val = *reinterpret_cast<const int64_t*>(data_);
7797
data_ += sizeof(int64_t);
7898
len_ -= sizeof(int64_t);
@@ -81,7 +101,7 @@ class PlainDecoder : public Decoder {
81101
}
82102

83103
virtual float GetFloat() {
84-
if (len_ < sizeof(float)) throw "EOF";
104+
if (len_ < sizeof(float)) ParquetException::EofException();
85105
float val = *reinterpret_cast<const float*>(data_);
86106
data_ += sizeof(float);
87107
len_ -= sizeof(float);
@@ -91,11 +111,11 @@ class PlainDecoder : public Decoder {
91111

92112
virtual String GetString() {
93113
String result;
94-
if (len_ < sizeof(uint32_t)) throw "EOF";
114+
if (len_ < sizeof(uint32_t)) ParquetException::EofException();
95115
result.len = *reinterpret_cast<const uint32_t*>(data_);
96116
data_ += sizeof(uint32_t);
97117
len_ -= sizeof(uint32_t);
98-
if (len_ < result.len) throw "EOF";
118+
if (len_ < result.len) ParquetException::EofException();
99119
result.ptr = data_;
100120
data_ += result.len;
101121
len_ -= result.len;
@@ -111,10 +131,12 @@ class PlainDecoder : public Decoder {
111131
class DictionaryDecoder : public Decoder {
112132
public:
113133
DictionaryDecoder(const parquet::SchemaElement* schema, Decoder* dictionary)
114-
: Decoder(schema) {
115-
int num_dictionary_values = dictionary->value_left();
134+
: Decoder(schema, parquet::Encoding::RLE_DICTIONARY) {
135+
int num_dictionary_values = dictionary->values_left();
116136
switch (schema->type) {
117-
case parquet::Type::BOOLEAN: throw "Boolean cols should not be dictionary encoded.";
137+
case parquet::Type::BOOLEAN:
138+
throw ParquetException("Boolean cols should not be dictionary encoded.");
139+
118140
case parquet::Type::INT32:
119141
int32_dictionary_.resize(num_dictionary_values);
120142
for (int i = 0; i < num_dictionary_values; ++i) {
@@ -140,7 +162,7 @@ class DictionaryDecoder : public Decoder {
140162
}
141163
break;
142164
default:
143-
throw "NYI";
165+
ParquetException::NYI();
144166
}
145167
}
146168

@@ -161,7 +183,7 @@ class DictionaryDecoder : public Decoder {
161183
private:
162184
int index() {
163185
int idx;
164-
if (!idx_decoder_.Get(&idx)) throw "EOF";
186+
if (!idx_decoder_.Get(&idx)) ParquetException::EofException();
165187
--num_values_;
166188
return idx;
167189
}

src/parquet.cc

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
const int DATA_PAGE_SIZE = 64 * 1024;
1010

11+
using namespace boost;
1112
using namespace parquet;
1213
using namespace std;
1314

@@ -31,16 +32,22 @@ const uint8_t* InMemoryInputStream::Read(int num_to_read, int* num_bytes) {
3132
ColumnReader::ColumnReader(const SchemaElement* schema, InputStream* stream)
3233
: schema_(schema),
3334
stream_(stream),
34-
num_buffered_values_(0) {
35+
num_buffered_values_(0),
36+
current_decoder_(NULL) {
3537
}
3638

37-
bool ColumnReader::GetInt32(int32_t* result) {
39+
int32_t ColumnReader::GetInt32(int* definition_level, int* repetition_level) {
3840
--num_buffered_values_;
39-
int def_level;
40-
if (!definition_level_decoder_.Get(&def_level)) throw "EOF";
41-
if (def_level == 0) return true;
42-
*result = decoder_->GetInt32();
43-
return false;
41+
*repetition_level = 1;
42+
if (!definition_level_decoder_.Get(definition_level)) ParquetException::EofException();
43+
if (*definition_level == 0) return true;
44+
return current_decoder_->GetInt32();
45+
}
46+
47+
static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
48+
// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
49+
// encoding.
50+
return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
4451
}
4552

4653
bool ColumnReader::ReadNewPage() {
@@ -54,18 +61,25 @@ bool ColumnReader::ReadNewPage() {
5461
return false;
5562
}
5663
stream_->Read(header_size, &bytes_read);
57-
cout << apache::thrift::ThriftDebugString(current_page_header_) << endl;
5864

5965
// TODO: handle decompression.
6066
int uncompressed_len = current_page_header_.uncompressed_page_size;
6167
buffer = stream_->Read(uncompressed_len, &bytes_read);
62-
if (bytes_read != uncompressed_len) throw "EOF";
68+
if (bytes_read != uncompressed_len) ParquetException::EofException();
6369

6470
if (current_page_header_.type == PageType::DICTIONARY_PAGE) {
71+
unordered_map<Encoding::type, shared_ptr<Decoder> >::iterator it =
72+
decoders_.find(Encoding::RLE_DICTIONARY);
73+
if (it != decoders_.end()) {
74+
throw ParquetException("Column cannot have more than one dictionary.");
75+
}
76+
6577
PlainDecoder dictionary(schema_);
6678
dictionary.SetData(current_page_header_.dictionary_page_header.num_values,
6779
buffer, uncompressed_len);
68-
decoder_.reset(new DictionaryDecoder(schema_, &dictionary));
80+
shared_ptr<Decoder> decoder(new DictionaryDecoder(schema_, &dictionary));
81+
decoders_[Encoding::RLE_DICTIONARY] = decoder;
82+
current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get();
6983
continue;
7084
} else if (current_page_header_.type == PageType::DATA_PAGE) {
7185
// Read a data page.
@@ -79,8 +93,36 @@ bool ColumnReader::ReadNewPage() {
7993

8094
// TODO: repetition levels
8195

82-
// Now buffer is at the start of the data.
83-
decoder_->SetData(num_buffered_values_, buffer,
96+
// Get a decoder object for this page or create a new decoder if this is the
97+
// first page with this encoding.
98+
Encoding::type encoding = current_page_header_.data_page_header.encoding;
99+
if (IsDictionaryIndexEncoding(encoding)) encoding = Encoding::RLE_DICTIONARY;
100+
101+
unordered_map<Encoding::type, shared_ptr<Decoder> >::iterator it =
102+
decoders_.find(encoding);
103+
if (it != decoders_.end()) {
104+
current_decoder_ = it->second.get();
105+
} else {
106+
switch (encoding) {
107+
case Encoding::PLAIN: {
108+
shared_ptr<Decoder> decoder(new PlainDecoder(schema_));
109+
decoders_[encoding] = decoder;
110+
current_decoder_ = decoder.get();
111+
break;
112+
}
113+
case Encoding::RLE_DICTIONARY:
114+
throw ParquetException("Dictionary page must be before data page.");
115+
116+
case Encoding::DELTA_BINARY_PACKED:
117+
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
118+
case Encoding::DELTA_BYTE_ARRAY:
119+
ParquetException::NYI();
120+
121+
default:
122+
throw ParquetException("Unknown encoding type.");
123+
}
124+
}
125+
current_decoder_->SetData(num_buffered_values_, buffer,
84126
uncompressed_len - sizeof(uint32_t) - num_definition_bytes);
85127
} else {
86128
// We don't know what this page type is. just skip it.

src/parquet/parquet.h

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
#ifndef PARQUET_PARQUET_H_
22
#define PARQUET_PARQUET_H_
33

4+
#include <exception>
45
#include <boost/cstdint.hpp>
6+
#include <boost/unordered_map.hpp>
57
#include "gen-cpp/parquet_constants.h"
68
#include "gen-cpp/parquet_types.h"
79

@@ -26,16 +28,45 @@ struct String {
2628
const uint8_t* ptr;
2729
};
2830

31+
class ParquetException : public std::exception {
32+
public:
33+
static void EofException() { throw ParquetException("Expected end of stream."); }
34+
static void NYI() { throw ParquetException("Not yet implemented."); }
35+
36+
explicit ParquetException(const char* msg) : msg_(msg) {}
37+
explicit ParquetException(const std::string& msg) : msg_(msg) {}
38+
39+
virtual ~ParquetException() throw() {}
40+
virtual const char* what() const throw() { return msg_.c_str(); }
41+
42+
private:
43+
std::string msg_;
44+
};
45+
46+
// Interface for the column reader to get the bytes. The interface is a stream
47+
// interface, meaning the bytes in order and once a byte is read, it does not
48+
// need to be read again.
2949
class InputStream {
3050
public:
31-
virtual ~InputStream() {}
51+
// Returns the next 'num_to_peek' without advancing the current position.
52+
// *num_bytes will contain the number of bytes returned which can only be
53+
// less than num_to_peek at end of stream cases.
54+
// Since the position is not advanced, calls to this function are idempotent.
55+
// The buffer returned to the caller is still owned by the input stream and must
56+
// stay valid until the next call to Peek() or Read().
3257
virtual const uint8_t* Peek(int num_to_peek, int* num_bytes) = 0;
58+
59+
// Identical to Peek(), except the current position in the stream is advanced by
60+
// *num_bytes.
3361
virtual const uint8_t* Read(int num_to_read, int* num_bytes) = 0;
3462

63+
virtual ~InputStream() {}
64+
3565
protected:
3666
InputStream() {}
3767
};
3868

69+
// Implementation of an InputStream when all the bytes are in memory.
3970
class InMemoryInputStream : public InputStream {
4071
public:
4172
InMemoryInputStream(const uint8_t* buffer, int64_t len);
@@ -48,12 +79,13 @@ class InMemoryInputStream : public InputStream {
4879
int64_t offset_;
4980
};
5081

82+
// API to read values from a single column.
5183
class ColumnReader {
5284
public:
5385
ColumnReader(const parquet::SchemaElement* schema, InputStream* stream);
54-
bool HasNext();
5586

56-
bool GetInt32(int32_t* result);
87+
bool HasNext();
88+
int32_t GetInt32(int* definition_level, int* repetition_level);
5789

5890
private:
5991
bool ReadNewPage();
@@ -64,7 +96,8 @@ class ColumnReader {
6496
parquet::PageHeader current_page_header_;
6597

6698
impala::RleDecoder definition_level_decoder_;
67-
boost::shared_ptr<Decoder> decoder_;
99+
boost::unordered_map<parquet::Encoding::type, boost::shared_ptr<Decoder> > decoders_;
100+
Decoder* current_decoder_;
68101
};
69102

70103
// Deserialize a thrift message from buf/len. buf/len must at least contain

0 commit comments

Comments
 (0)