Skip to content

Commit f3446cf

Browse files
authored
Merge 88008e8 into a7f2434
2 parents a7f2434 + 88008e8 commit f3446cf

File tree

10 files changed

+43
-2
lines changed

10 files changed

+43
-2
lines changed

ydb/library/yql/providers/s3/compressors/zstd.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,22 @@ TReadBuffer::TReadBuffer(NDB::ReadBuffer& source)
1414
InBuffer.resize(8_KB);
1515
OutBuffer.resize(64_KB);
1616
Offset_ = InBuffer.size();
17+
Size_ = InBuffer.size();
1718
}
1819

1920
TReadBuffer::~TReadBuffer() {
2021
::ZSTD_freeDStream(ZCtx_);
2122
}
2223

2324
bool TReadBuffer::nextImpl() {
24-
::ZSTD_inBuffer zIn{InBuffer.data(), InBuffer.size(), Offset_};
25+
::ZSTD_inBuffer zIn{InBuffer.data(), Size_, Offset_};
2526
::ZSTD_outBuffer zOut{OutBuffer.data(), OutBuffer.size(), 0ULL};
2627

2728
size_t returnCode = 0ULL;
2829
if (!Finished_) do {
2930
if (zIn.pos == zIn.size) {
3031
zIn.size = Source_.read(InBuffer.data(), InBuffer.size());
32+
Size_ = zIn.size;
3133

3234
zIn.pos = Offset_ = 0;
3335
if (!zIn.size) {

ydb/library/yql/providers/s3/compressors/zstd.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ class TReadBuffer : public NDB::ReadBuffer {
1919
NDB::ReadBuffer& Source_;
2020
std::vector<char> InBuffer, OutBuffer;
2121
::ZSTD_DStream *const ZCtx_;
22-
size_t Offset_;
22+
size_t Offset_;
23+
size_t Size_;
2324
bool Finished_ = false;
2425
};
2526

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
��S
2+
��;�…�����^�
3+
K�lġ�R�\���-!0
Binary file not shown.
271 Bytes
Binary file not shown.
386 Bytes
Binary file not shown.
168 Bytes
Binary file not shown.
Binary file not shown.
Binary file not shown.

ydb/tests/fq/s3/test_compressions.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,41 @@ def test_compression(self, kikimr, s3, client, filename, compression):
6262
result_set = data.result.result_set
6363
self.validate_result(result_set)
6464

65+
@yq_all
66+
@pytest.mark.parametrize("filename, compression", [
67+
("big.json.gz", "gzip"),
68+
("big.json.lz4", "lz4"),
69+
("big.json.br", "brotli"),
70+
("big.json.bz2", "bzip2"),
71+
("big.json.zst", "zstd"),
72+
("big.json.xz", "xz")
73+
])
74+
def test_big_compression(self, kikimr, s3, client, filename, compression, unique_prefix):
75+
self.create_bucket_and_upload_file(filename, s3, kikimr)
76+
storage_connection_name = unique_prefix + "fruitbucket"
77+
client.create_storage_connection(storage_connection_name, "fbucket")
78+
79+
sql = '''
80+
SELECT count(*)
81+
FROM `{}`.`{}`
82+
WITH (format=json_each_row, compression="{}", SCHEMA (
83+
a String NOT NULL
84+
));
85+
'''.format(storage_connection_name, filename, compression)
86+
87+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
88+
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
89+
90+
data = client.get_result_data(query_id)
91+
result_set = data.result.result_set
92+
93+
logging.debug(str(result_set))
94+
assert len(result_set.columns) == 1
95+
assert result_set.columns[0].name == "column0"
96+
assert result_set.columns[0].type.type_id == ydb.Type.UINT64
97+
assert len(result_set.rows) == 1
98+
assert result_set.rows[0].items[0].uint64_value == 5458
99+
65100
@yq_all
66101
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
67102
def test_invalid_compression(self, kikimr, s3, client):

0 commit comments

Comments
 (0)