Skip to content

zstd: Add true concurrent stream encoding#1136

Open
klauspost wants to merge 5 commits intomasterfrom
add-concurrent-encoding
Open

zstd: Add true concurrent stream encoding#1136
klauspost wants to merge 5 commits intomasterfrom
add-concurrent-encoding

Conversation

@klauspost
Copy link
Copy Markdown
Owner

@klauspost klauspost commented Mar 20, 2026

Adds true concurrent stream encoding:

// WithConcurrentBlocks enables job-based parallel compression for streams.
// When enabled and concurrent > 1, input is split into large sections (jobs)
// that are compressed simultaneously by multiple goroutines.
// Each non-first job receives an overlap prefix from the previous job for match context.
// Output is flushed in order, producing a valid single-frame zstd stream.
//
// This is incompatible with dictionary encoding.
// Cannot be changed with ResetWithOptions.
func WithConcurrentBlocks(b bool) EOption {

Example:

Level 1 thread 4 threads 16 threads 1T ratio 16T ratio
fastest 783 MB/s 2950 MB/s (3.8×) 6939 MB/s (8.9×) 12.24% 12.26%
default 728 MB/s 2533 MB/s (3.5×) 5340 MB/s (7.3×) 10.67% 10.68%
better 434 MB/s 1105 MB/s (2.5×) 2206 MB/s (5.1×) 9.14% 9.21%
best 129 MB/s 367 MB/s (2.8×) 884 MB/s (6.8×) 8.48% 8.63%

Summary by CodeRabbit

  • New Features
    • Parallel, job-based compression mode (WithConcurrentBlocks) for higher throughput; streaming APIs (Write, ReadFrom, Flush, Close) use the concurrent path when enabled.
  • New Behavior
    • Encoders can be seeded from an explicit prefix to improve match-finding; prefix seeding is unavailable for dictionary-backed encoders.
  • Tests
    • Added extensive tests and benchmarks covering concurrent-blocks, edge cases, regressions, and stress scenarios.
  • Documentation
    • Added “Parallel Stream Compression” docs and usage notes.

```go
// WithConcurrentBlocks enables job-based parallel compression for streams.
// When enabled and concurrent > 1, input is split into large sections (jobs)
// that are compressed simultaneously by multiple goroutines.
// Each non-first job receives an overlap prefix from the previous job for match context.
// Output is flushed in order, producing a valid single-frame zstd stream.
//
// This is incompatible with dictionary encoding.
// Cannot be changed with ResetWithOptions.
func WithConcurrentBlocks(b bool) EOption {
```

Example:

| Level   | 1 thread   | 4 threads          | 16 threads          | 1T ratio | 16T ratio |
|---------|:----------:|:------------------:|:-------------------:|:--------:|:---------:|
| fastest | 783 MB/s   | 2950 MB/s (3.8×)   | 6939 MB/s (8.9×)    | 12.24%   | 12.26%    |
| default | 728 MB/s   | 2533 MB/s (3.5×)   | 5340 MB/s (7.3×)    | 10.67%   | 10.68%    |
| better  | 434 MB/s   | 1105 MB/s (2.5×)   | 2206 MB/s (5.1×)    | 9.14%    | 9.21%     |
| best    | 129 MB/s   | 367 MB/s (2.8×)    | 884 MB/s (6.8×)     | 8.48%    | 8.63%     |
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Mar 20, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 5b1e1386-4501-4d37-9084-965979ee77de

📥 Commits

Reviewing files that changed from the base of the PR and between d094c77 and 4e6e49c.

📒 Files selected for processing (1)
  • zstd/encoder.go

📝 Walkthrough

Walkthrough

Adds prefix-based encoder reset APIs and implements a parallel job-based concurrent-blocks encoding pipeline (workers, flusher, buffer pools). Dictionary-backed encoders disallow prefix resets and concurrent-blocks is disabled when a dictionary is present or encoder concurrency ≤ 1.

Changes

Cohort / File(s) Summary
Prefix reset implementations
zstd/enc_base.go, zstd/enc_best.go, zstd/enc_better.go, zstd/enc_dfast.go, zstd/enc_fast.go
Added unexported resetBasePrefix(prefix []byte) and ResetPrefix(prefix []byte) on several encoder types to seed history from an explicit prefix and prepopulate match/hash tables. Dict-backed encoder variants implement ResetPrefix to panic (unsupported).
Concurrent jobs pipeline
zstd/enc_jobs.go, zstd/enc_jobs_test.go
Introduced job orchestration types (encJob, jobState), worker goroutines, ordered jobFlusher, buffer pools, dispatch/shutdown logic, compression worker loop (compressJob) and extensive tests/benchmarks validating concurrent-blocks behavior and regressions.
Encoder integration & options
zstd/encoder.go, zstd/encoder_options.go, zstd/encoder_test.go
Extended encoder interface with ResetPrefix; added persistent encoderState.jobs; introduced WithConcurrentBlocks(bool), jobSize() and overlapSize() helpers; gated concurrent-blocks when dict present or concurrency ≤ 1; routed Write/ReadFrom/Flush/Close through job-based paths when enabled; tests updated to include concurrent-block variants.
Docs
zstd/README.md
Added "Parallel Stream Compression" documentation and clarified stream-concurrency defaults, overlap handling, ordered flushing, dictionary incompatibility, and usage example.

Sequence Diagram

sequenceDiagram
    participant Client as Client
    participant Enc as Encoder
    participant Jobs as JobState
    participant Worker as Worker
    participant Flusher as Flusher
    participant W as Writer

    Client->>Enc: Write(data)
    Enc->>Jobs: buffer into filling
    alt buffer full or final
        Enc->>Jobs: dispatchJob(seq, prefix, input)
        Jobs->>Worker: workCh: job
        Worker->>Worker: compressJob(prefix, input)
        Worker->>Jobs: resultCh: completed job
    end
    Flusher->>Jobs: receive next completed job (in-order)
    Flusher->>W: Write(compressed bytes)
    Client->>Enc: Close()
    Enc->>Flusher: waitAllJobs / shutdown
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 11.43% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'zstd: Add true concurrent stream encoding' accurately and concisely describes the main change: introducing concurrent block compression via WithConcurrentBlocks option.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch add-concurrent-encoding

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (4)
zstd/enc_jobs_test.go (3)

234-236: Unchecked error return from Write.

The error from enc1.Write(input) is ignored. While subsequent Close() and DecodeAll would likely catch issues, checking the error provides clearer failure diagnostics.

Suggested fix
-			enc1.Write(input)
+			if _, err := enc1.Write(input); err != nil {
+				t.Fatal(err)
+			}
 			if err := enc1.Close(); err != nil {

Same pattern applies to enc2.Write(input) on line 249.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@zstd/enc_jobs_test.go` around lines 234 - 236, The test ignores the error
return from enc1.Write(input) (and similarly enc2.Write(input)), which loses
failure context; update the test to capture and check the returned error from
enc1.Write and enc2.Write (e.g., if err := enc1.Write(input); err != nil {
t.Fatal(err) }) so failures are reported immediately and clearly, keeping the
existing Close()/DecodeAll checks intact; modify the Write calls in the enc1 and
enc2 test blocks to perform this error check.

85-86: Subtest name missing size and concurrency parameters.

The subtest name only includes the level, which will result in duplicate subtest names across different size/concurrency combinations. This makes it harder to identify which specific combination failed.

Suggested fix
-				name := level.String()
+				name := fmt.Sprintf("%s-size%d-conc%d", level.String(), size, conc)
 				t.Run(name, func(t *testing.T) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@zstd/enc_jobs_test.go` around lines 85 - 86, The subtest name only uses
level.String(), causing duplicate names across different size and concurrency
loops; update the name construction in enc_jobs_test.go (where name :=
level.String() and t.Run(name, ... ) are used) to include the size and
concurrency values (e.g., append or format size and concurrency into the name)
so each subtest is uniquely identified by level, size, and concurrency.

1175-1200: Missing assertion for fullZero=false case.

When fullZero is false, the test closes the encoder but doesn't verify the expected behavior. Consider adding an assertion to verify the output is empty (no frame written) for this case.

Suggested improvement
 		if fullZero {
 			if buf.Len() == 0 {
 				t.Fatal("expected non-empty output with fullZero")
 			}
 			decoded, err := dec.DecodeAll(buf.Bytes(), nil)
 			if err != nil {
 				t.Fatal("decode error:", err)
 			}
 			if len(decoded) != 0 {
 				t.Fatalf("expected empty decoded, got %d bytes", len(decoded))
 			}
+		} else {
+			if buf.Len() != 0 {
+				t.Fatalf("expected empty output without fullZero, got %d bytes", buf.Len())
+			}
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@zstd/enc_jobs_test.go` around lines 1175 - 1200, Add an assertion for the
fullZero == false branch in the test loop: after closing the encoder created via
NewWriter(..., WithZeroFrames(fullZero)) assert that buf.Len() == 0 (and
optionally that dec.DecodeAll(buf.Bytes(), nil) returns an empty decoded slice
or no frames) to verify no frame was written when WithZeroFrames is false;
update the test near the loop over fullZero to check buf.Len() and decoded
length for the false case using the existing buf and dec.DecodeAll calls.
zstd/enc_jobs.go (1)

74-80: Consider including stack trace in the error.

The stack trace is printed to stderr but not captured in the error. In production environments, stderr output might be lost. Consider including a truncated stack trace in the error message for better debuggability.

Suggested improvement
 	defer func() {
 		if r := recover(); r != nil {
-			job.err = fmt.Errorf("panic in parallel job: %v", r)
-			rdebug.PrintStack()
+			job.err = fmt.Errorf("panic in parallel job: %v\n%s", r, rdebug.Stack())
 		}
 	}()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@zstd/enc_jobs.go` around lines 74 - 80, In compressJob, when recovering from
a panic you currently call rdebug.PrintStack() but don't attach the stack to
job.err; capture the stack (e.g., via debug.Stack()), truncate if very large,
and include it in the formatted error assigned to job.err (so job.err contains
both the panic value and a short stack trace). Update compressJob’s defer
recovery block to build the combined message and assign it to job.err instead of
only calling rdebug.PrintStack().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@zstd/enc_jobs_test.go`:
- Around line 234-236: The test ignores the error return from enc1.Write(input)
(and similarly enc2.Write(input)), which loses failure context; update the test
to capture and check the returned error from enc1.Write and enc2.Write (e.g., if
err := enc1.Write(input); err != nil { t.Fatal(err) }) so failures are reported
immediately and clearly, keeping the existing Close()/DecodeAll checks intact;
modify the Write calls in the enc1 and enc2 test blocks to perform this error
check.
- Around line 85-86: The subtest name only uses level.String(), causing
duplicate names across different size and concurrency loops; update the name
construction in enc_jobs_test.go (where name := level.String() and t.Run(name,
... ) are used) to include the size and concurrency values (e.g., append or
format size and concurrency into the name) so each subtest is uniquely
identified by level, size, and concurrency.
- Around line 1175-1200: Add an assertion for the fullZero == false branch in
the test loop: after closing the encoder created via NewWriter(...,
WithZeroFrames(fullZero)) assert that buf.Len() == 0 (and optionally that
dec.DecodeAll(buf.Bytes(), nil) returns an empty decoded slice or no frames) to
verify no frame was written when WithZeroFrames is false; update the test near
the loop over fullZero to check buf.Len() and decoded length for the false case
using the existing buf and dec.DecodeAll calls.

In `@zstd/enc_jobs.go`:
- Around line 74-80: In compressJob, when recovering from a panic you currently
call rdebug.PrintStack() but don't attach the stack to job.err; capture the
stack (e.g., via debug.Stack()), truncate if very large, and include it in the
formatted error assigned to job.err (so job.err contains both the panic value
and a short stack trace). Update compressJob’s defer recovery block to build the
combined message and assign it to job.err instead of only calling
rdebug.PrintStack().

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 32d48d40-e809-431b-8db3-488c4e1eac6d

📥 Commits

Reviewing files that changed from the base of the PR and between c5e0077 and 7413259.

📒 Files selected for processing (10)
  • zstd/enc_base.go
  • zstd/enc_best.go
  • zstd/enc_better.go
  • zstd/enc_dfast.go
  • zstd/enc_fast.go
  • zstd/enc_jobs.go
  • zstd/enc_jobs_test.go
  • zstd/encoder.go
  • zstd/encoder_options.go
  • zstd/encoder_test.go

@klauspost klauspost changed the title zstd: Add true concurrent encoding zstd: Add true concurrent stream encoding Mar 20, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@zstd/encoder.go`:
- Around line 569-580: The flushJobs helper (Encoder.flushJobs) currently only
returns jobs.flusherErr and can mask a previously stored stream error
(e.state.err); update flushJobs to, after waitAllJobs and before returning,
check e.state.err and return that error if non-nil (prefer the earlier stream
error over flusherErr), and apply the same change pattern to the other helper(s)
referenced around lines 652-704 (the Close/Flush finalization helpers) so they
honor e.state.err in addition to jobs.flusherErr; use the existing symbols
e.dispatchJob, e.waitAllJobs, js.mu and js.flusherErr to locate and adjust the
logic.
- Around line 598-600: The Close path for concurrent mode is checked after a
guard that wrongly inspects s.filling rather than the concurrent buffer
s.jobs.filling, causing Reset(nil) + Write + Close to incorrectly take the
nil-writer early-return; modify the Close implementation so the concurrent
branch (use e.closeJobs()) is executed before the no-writer check or update the
no-writer check to also examine s.jobs.filling (and any other concurrent
buffers) so that pending bytes in s.jobs.filling are recognized and the function
returns the proper "encoder has no writer" error when appropriate; locate
symbols Close, e.closeJobs, s.filling, s.jobs.filling, Reset, and Write to apply
the change.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 5fb36ce3-fb2b-43ee-be84-a264794d6168

📥 Commits

Reviewing files that changed from the base of the PR and between 20a57ae and d094c77.

📒 Files selected for processing (4)
  • zstd/README.md
  • zstd/enc_jobs_test.go
  • zstd/encoder.go
  • zstd/encoder_options.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • zstd/encoder_options.go

Close() checked s.w == nil before the e.o.concurrentBlocks branch. In concurrent mode, Write fills js.filling (not s.filling), so the guard len(s.filling) == 0 && s.nInput == 0 would incorrectly return nil even with pending data. Fix: moved the concurrentBlocks branch before the nil-writer check, and added a matching nil-writer guard inside closeJobs() that checks js.filling instead of s.filling.
@klauspost
Copy link
Copy Markdown
Owner Author

Main constraint for scaling is CPU cache thrashing, basically hash tables can no longer stay in L3 cache.

Halving the "better" table size:

better c1 MB/s c4∥ MB/s c16∥ MB/s c1 ratio c16∥ ratio
original 386 810 (2.1×) 1697 (4.4×) 9.14% 9.21%
halved long 369 1391 (3.8×) 2867 (7.8×) 9.26% 9.33%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant