Skip to content

Commit f979b15

Browse files
author
Nong Li
committed
Implement some encodings.
1 parent f1b987e commit f979b15

File tree

4 files changed

+86
-29
lines changed

4 files changed

+86
-29
lines changed

src/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
1615
add_library(Parquet STATIC
1716
parquet.cc
1817
)

src/encodings.h

Lines changed: 83 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,27 +22,38 @@ class Decoder {
2222
virtual float GetFloat() { return 0; }
2323
virtual String GetString() { return String(); }
2424

25+
int value_left() const { return num_values_; }
26+
2527
protected:
26-
Decoder(parquet::SchemaElement* schema) : schema_(schema) {}
28+
Decoder(parquet::SchemaElement* schema, int num_values)
29+
: schema_(schema), num_values_(num_values) {}
2730
parquet::SchemaElement* schema_;
31+
int num_values_;
2832
};
2933

3034
class BoolDecoder : public Decoder {
3135
public:
32-
BoolDecoder(parquet::SchemaElement* schema) : Decoder(schema) { }
36+
BoolDecoder(parquet::SchemaElement* schema, int num_values) : Decoder(schema, num_values) { }
3337

3438
virtual void SetData(const uint8_t* data, int len) {
39+
decoder_ = impala::RleDecoder(data, len, 1);
40+
}
41+
42+
virtual bool GetBool() {
43+
bool result;
44+
if (!decoder_.Get(&result)) throw "EOF";
45+
--num_values_;
46+
return result;
3547
}
3648

37-
virtual bool GetBool() {
38-
return false;
39-
}
49+
private:
50+
impala::RleDecoder decoder_;
4051
};
4152

4253
class PlainDecoder : public Decoder {
4354
public:
44-
PlainDecoder(parquet::SchemaElement* schema)
45-
: Decoder(schema), data_(NULL), len_(0) {
55+
PlainDecoder(parquet::SchemaElement* schema, int num_values)
56+
: Decoder(schema, num_values), data_(NULL), len_(0) {
4657
}
4758

4859
virtual void SetData(const uint8_t* data, int len) {
@@ -51,39 +62,43 @@ class PlainDecoder : public Decoder {
5162
}
5263

5364
virtual int32_t GetInt32() {
54-
if (len_ < sizeof(int32_t)) throw "Bad";
65+
if (len_ < sizeof(int32_t)) throw "EOF";
5566
int32_t val = *reinterpret_cast<const int32_t*>(data_);
5667
data_ += sizeof(int32_t);
5768
len_ -= sizeof(int32_t);
69+
--num_values_;
5870
return val;
5971
}
6072

6173
virtual int64_t GetInt64() {
62-
if (len_ < sizeof(int64_t)) throw "Bad";
74+
if (len_ < sizeof(int64_t)) throw "EOF";
6375
int64_t val = *reinterpret_cast<const int64_t*>(data_);
6476
data_ += sizeof(int64_t);
6577
len_ -= sizeof(int64_t);
78+
--num_values_;
6679
return val;
6780
}
6881

6982
virtual float GetFloat() {
70-
if (len_ < sizeof(float)) throw "Bad";
83+
if (len_ < sizeof(float)) throw "EOF";
7184
float val = *reinterpret_cast<const float*>(data_);
7285
data_ += sizeof(float);
7386
len_ -= sizeof(float);
87+
--num_values_;
7488
return val;
7589
}
7690

7791
virtual String GetString() {
7892
String result;
79-
if (len_ < sizeof(uint32_t)) throw "Bad";
93+
if (len_ < sizeof(uint32_t)) throw "EOF";
8094
result.len = *reinterpret_cast<const uint32_t*>(data_);
8195
data_ += sizeof(uint32_t);
8296
len_ -= sizeof(uint32_t);
83-
if (len_ < result.len) throw "Bad";
97+
if (len_ < result.len) throw "EOF";
8498
result.ptr = data_;
8599
data_ += result.len;
86100
len_ -= result.len;
101+
--num_values_;
87102
return result;
88103
}
89104

@@ -94,25 +109,68 @@ class PlainDecoder : public Decoder {
94109

95110
class DictionaryDecoder : public Decoder {
96111
public:
97-
DictionaryDecoder(parquet::SchemaElement* schema, Decoder* dictionary)
98-
: Decoder(schema) {
112+
DictionaryDecoder(parquet::SchemaElement* schema, int num_values, Decoder* dictionary)
113+
: Decoder(schema, num_values) {
114+
int num_dictionary_values = dictionary->value_left();
115+
switch (schema->type) {
116+
case parquet::Type::BOOLEAN: throw "Boolean cols should not be dictionary encoded.";
117+
case parquet::Type::INT32:
118+
int32_dictionary_.resize(num_dictionary_values);
119+
for (int i = 0; i < num_dictionary_values; ++i) {
120+
int32_dictionary_[i] = dictionary->GetInt32();
121+
}
122+
break;
123+
case parquet::Type::INT64:
124+
int64_dictionary_.resize(num_dictionary_values);
125+
for (int i = 0; i < num_dictionary_values; ++i) {
126+
int64_dictionary_[i] = dictionary->GetInt64();
127+
}
128+
break;
129+
case parquet::Type::FLOAT:
130+
float_dictionary_.resize(num_dictionary_values);
131+
for (int i = 0; i < num_dictionary_values; ++i) {
132+
float_dictionary_[i] = dictionary->GetFloat();
133+
}
134+
break;
135+
case parquet::Type::BYTE_ARRAY:
136+
string_dictionary_.resize(num_dictionary_values);
137+
for (int i = 0; i < num_dictionary_values; ++i) {
138+
string_dictionary_[i] = dictionary->GetString();
139+
}
140+
break;
141+
default:
142+
throw "NYI";
143+
}
99144
}
100145

101146
virtual void SetData(const uint8_t* data, int len) {
147+
if (len == 0) return;
148+
uint8_t bit_width = *data;
149+
++data;
150+
--len;
151+
idx_decoder_ = impala::RleDecoder(data, len, bit_width);
102152
}
103153

104-
virtual int32_t GetInt32() {
105-
return 0;
106-
}
107-
virtual int64_t GetInt64() {
108-
return 0;
109-
}
110-
virtual float GetFloat() {
111-
return 0;
112-
}
113-
virtual String GetString() {
114-
return String();
154+
virtual int32_t GetInt32() { return int32_dictionary_[index()]; }
155+
virtual int64_t GetInt64() { return int64_dictionary_[index()]; }
156+
virtual float GetFloat() { return float_dictionary_[index()]; }
157+
virtual String GetString() { return string_dictionary_[index()]; }
158+
159+
private:
160+
int index() {
161+
int idx;
162+
if (!idx_decoder_.Get(&idx)) throw "EOF";
163+
--num_values_;
164+
return idx;
115165
}
166+
167+
// Only one is set.
168+
std::vector<int32_t> int32_dictionary_;
169+
std::vector<int64_t> int64_dictionary_;
170+
std::vector<float> float_dictionary_;
171+
std::vector<String> string_dictionary_;
172+
173+
impala::RleDecoder idx_decoder_;
116174
};
117175

118176
}

src/impala/bit-stream-utils.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class BitWriter {
9292
class BitReader {
9393
public:
9494
// 'buffer' is the buffer to read from. The buffer's length is 'buffer_len'.
95-
BitReader(uint8_t* buffer, int buffer_len) :
95+
BitReader(const uint8_t* buffer, int buffer_len) :
9696
buffer_(buffer),
9797
max_bytes_(buffer_len),
9898
byte_offset_(0),
@@ -127,7 +127,7 @@ class BitReader {
127127
static const int MAX_VLQ_BYTE_LEN = 5;
128128

129129
private:
130-
uint8_t* buffer_;
130+
const uint8_t* buffer_;
131131
int max_bytes_;
132132

133133
// Bytes are memcpy'd from buffer_ and values are read from this variable. This is

src/impala/rle-encoding.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class RleDecoder {
7979
public:
8080
// Create a decoder object. buffer/buffer_len is the decoded data.
8181
// bit_width is the width of each value (before encoding).
82-
RleDecoder(uint8_t* buffer, int buffer_len, int bit_width)
82+
RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width)
8383
: bit_reader_(buffer, buffer_len),
8484
bit_width_(bit_width),
8585
current_value_(0),

0 commit comments

Comments
 (0)