Skip to content

Commit

Permalink
apacheGH-39921: [Go][Parquet] ColumnWriter not reset TotalCompressedB…
Browse files Browse the repository at this point in the history
…ytes after Flush (apache#39922)

### Rationale for this change

See apache#39921

### What changes are included in this PR?

Not clearing `totalCompressedBytes` when flush called

### Are these changes tested?

Yes

### Are there any user-facing changes?

Yes, it's a bugfix

* Closes: apache#39921

Authored-by: mwish <maplewish117@gmail.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
mapleFU authored and thisisnic committed Mar 8, 2024
1 parent b281e81 commit 915f449
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
5 changes: 3 additions & 2 deletions go/parquet/file/column_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,6 @@ func (w *columnWriter) FlushBufferedDataPages() (err error) {
}
}
w.pages = w.pages[:0]
w.totalCompressedBytes = 0
return
}

Expand Down Expand Up @@ -542,7 +541,9 @@ func (w *columnWriter) Close() (err error) {
if !w.closed {
w.closed = true
if w.hasDict && !w.fallbackToNonDict {
w.WriteDictionaryPage()
if err = w.WriteDictionaryPage(); err != nil {
return err
}
}

if err = w.FlushBufferedDataPages(); err != nil {
Expand Down
28 changes: 28 additions & 0 deletions go/parquet/file/column_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,26 @@ func (p *PrimitiveWriterTestSuite) testDictionaryFallbackEncoding(version parque
}
}

func (p *PrimitiveWriterTestSuite) testDictionaryFallbackAndCompressedSize(version parquet.Version) {
p.GenerateData(SmallSize)
props := parquet.DefaultColumnProperties()
props.DictionaryEnabled = true

if version == parquet.V1_0 {
props.Encoding = parquet.Encodings.PlainDict
} else {
props.Encoding = parquet.Encodings.RLEDict
}

writer := p.buildWriter(SmallSize, props, parquet.WithVersion(version))
p.WriteBatchValues(writer, nil, nil)
writer.FallbackToPlain()
p.NotEqual(0, writer.TotalCompressedBytes())
writer.Close()
p.NotEqual(0, writer.TotalCompressedBytes())
p.NotEqual(0, writer.TotalBytesWritten())
}

func (p *PrimitiveWriterTestSuite) TestRequiredPlain() {
p.testRequiredWithEncoding(parquet.Encodings.Plain)
}
Expand Down Expand Up @@ -575,6 +595,14 @@ func (p *PrimitiveWriterTestSuite) TestDictionaryFallbackEncodingV2() {
p.testDictionaryFallbackEncoding(parquet.V2_LATEST)
}

func (p *PrimitiveWriterTestSuite) TestDictionaryFallbackStatsV1() {
p.testDictionaryFallbackAndCompressedSize(parquet.V1_0)
}

func (p *PrimitiveWriterTestSuite) TestDictionaryFallbackStatsV2() {
p.testDictionaryFallbackAndCompressedSize(parquet.V2_LATEST)
}

func (p *PrimitiveWriterTestSuite) TestOptionalNullValueChunk() {
// test case for NULL values
p.SetupSchema(parquet.Repetitions.Optional, 1)
Expand Down

0 comments on commit 915f449

Please sign in to comment.