Conversation
```go
// WithConcurrentBlocks enables job-based parallel compression for streams.
// When enabled and concurrent > 1, input is split into large sections (jobs)
// that are compressed simultaneously by multiple goroutines.
// Each non-first job receives an overlap prefix from the previous job for match context.
// Output is flushed in order, producing a valid single-frame zstd stream.
//
// This is incompatible with dictionary encoding.
// Cannot be changed with ResetWithOptions.
func WithConcurrentBlocks(b bool) EOption {
```
Example:
| Level | 1 thread | 4 threads | 16 threads | 1T ratio | 16T ratio |
|---------|:----------:|:------------------:|:-------------------:|:--------:|:---------:|
| fastest | 783 MB/s | 2950 MB/s (3.8×) | 6939 MB/s (8.9×) | 12.24% | 12.26% |
| default | 728 MB/s | 2533 MB/s (3.5×) | 5340 MB/s (7.3×) | 10.67% | 10.68% |
| better | 434 MB/s | 1105 MB/s (2.5×) | 2206 MB/s (5.1×) | 9.14% | 9.21% |
| best | 129 MB/s | 367 MB/s (2.8×) | 884 MB/s (6.8×) | 8.48% | 8.63% |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughAdds prefix-based encoder reset APIs and implements a parallel job-based concurrent-blocks encoding pipeline (workers, flusher, buffer pools). Dictionary-backed encoders disallow prefix resets and concurrent-blocks is disabled when a dictionary is present or encoder concurrency ≤ 1. Changes
Sequence DiagramsequenceDiagram
participant Client as Client
participant Enc as Encoder
participant Jobs as JobState
participant Worker as Worker
participant Flusher as Flusher
participant W as Writer
Client->>Enc: Write(data)
Enc->>Jobs: buffer into filling
alt buffer full or final
Enc->>Jobs: dispatchJob(seq, prefix, input)
Jobs->>Worker: workCh: job
Worker->>Worker: compressJob(prefix, input)
Worker->>Jobs: resultCh: completed job
end
Flusher->>Jobs: receive next completed job (in-order)
Flusher->>W: Write(compressed bytes)
Client->>Enc: Close()
Enc->>Flusher: waitAllJobs / shutdown
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (4)
zstd/enc_jobs_test.go (3)
234-236: Unchecked error return from Write.The error from
enc1.Write(input)is ignored. While subsequentClose()andDecodeAllwould likely catch issues, checking the error provides clearer failure diagnostics.Suggested fix
- enc1.Write(input) + if _, err := enc1.Write(input); err != nil { + t.Fatal(err) + } if err := enc1.Close(); err != nil {Same pattern applies to
enc2.Write(input)on line 249.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@zstd/enc_jobs_test.go` around lines 234 - 236, The test ignores the error return from enc1.Write(input) (and similarly enc2.Write(input)), which loses failure context; update the test to capture and check the returned error from enc1.Write and enc2.Write (e.g., if err := enc1.Write(input); err != nil { t.Fatal(err) }) so failures are reported immediately and clearly, keeping the existing Close()/DecodeAll checks intact; modify the Write calls in the enc1 and enc2 test blocks to perform this error check.
85-86: Subtest name missing size and concurrency parameters.The subtest name only includes the level, which will result in duplicate subtest names across different size/concurrency combinations. This makes it harder to identify which specific combination failed.
Suggested fix
- name := level.String() + name := fmt.Sprintf("%s-size%d-conc%d", level.String(), size, conc) t.Run(name, func(t *testing.T) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@zstd/enc_jobs_test.go` around lines 85 - 86, The subtest name only uses level.String(), causing duplicate names across different size and concurrency loops; update the name construction in enc_jobs_test.go (where name := level.String() and t.Run(name, ... ) are used) to include the size and concurrency values (e.g., append or format size and concurrency into the name) so each subtest is uniquely identified by level, size, and concurrency.
1175-1200: Missing assertion forfullZero=falsecase.When
fullZerois false, the test closes the encoder but doesn't verify the expected behavior. Consider adding an assertion to verify the output is empty (no frame written) for this case.Suggested improvement
if fullZero { if buf.Len() == 0 { t.Fatal("expected non-empty output with fullZero") } decoded, err := dec.DecodeAll(buf.Bytes(), nil) if err != nil { t.Fatal("decode error:", err) } if len(decoded) != 0 { t.Fatalf("expected empty decoded, got %d bytes", len(decoded)) } + } else { + if buf.Len() != 0 { + t.Fatalf("expected empty output without fullZero, got %d bytes", buf.Len()) + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@zstd/enc_jobs_test.go` around lines 1175 - 1200, Add an assertion for the fullZero == false branch in the test loop: after closing the encoder created via NewWriter(..., WithZeroFrames(fullZero)) assert that buf.Len() == 0 (and optionally that dec.DecodeAll(buf.Bytes(), nil) returns an empty decoded slice or no frames) to verify no frame was written when WithZeroFrames is false; update the test near the loop over fullZero to check buf.Len() and decoded length for the false case using the existing buf and dec.DecodeAll calls.zstd/enc_jobs.go (1)
74-80: Consider including stack trace in the error.The stack trace is printed to stderr but not captured in the error. In production environments, stderr output might be lost. Consider including a truncated stack trace in the error message for better debuggability.
Suggested improvement
defer func() { if r := recover(); r != nil { - job.err = fmt.Errorf("panic in parallel job: %v", r) - rdebug.PrintStack() + job.err = fmt.Errorf("panic in parallel job: %v\n%s", r, rdebug.Stack()) } }()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@zstd/enc_jobs.go` around lines 74 - 80, In compressJob, when recovering from a panic you currently call rdebug.PrintStack() but don't attach the stack to job.err; capture the stack (e.g., via debug.Stack()), truncate if very large, and include it in the formatted error assigned to job.err (so job.err contains both the panic value and a short stack trace). Update compressJob’s defer recovery block to build the combined message and assign it to job.err instead of only calling rdebug.PrintStack().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@zstd/enc_jobs_test.go`:
- Around line 234-236: The test ignores the error return from enc1.Write(input)
(and similarly enc2.Write(input)), which loses failure context; update the test
to capture and check the returned error from enc1.Write and enc2.Write (e.g., if
err := enc1.Write(input); err != nil { t.Fatal(err) }) so failures are reported
immediately and clearly, keeping the existing Close()/DecodeAll checks intact;
modify the Write calls in the enc1 and enc2 test blocks to perform this error
check.
- Around line 85-86: The subtest name only uses level.String(), causing
duplicate names across different size and concurrency loops; update the name
construction in enc_jobs_test.go (where name := level.String() and t.Run(name,
... ) are used) to include the size and concurrency values (e.g., append or
format size and concurrency into the name) so each subtest is uniquely
identified by level, size, and concurrency.
- Around line 1175-1200: Add an assertion for the fullZero == false branch in
the test loop: after closing the encoder created via NewWriter(...,
WithZeroFrames(fullZero)) assert that buf.Len() == 0 (and optionally that
dec.DecodeAll(buf.Bytes(), nil) returns an empty decoded slice or no frames) to
verify no frame was written when WithZeroFrames is false; update the test near
the loop over fullZero to check buf.Len() and decoded length for the false case
using the existing buf and dec.DecodeAll calls.
In `@zstd/enc_jobs.go`:
- Around line 74-80: In compressJob, when recovering from a panic you currently
call rdebug.PrintStack() but don't attach the stack to job.err; capture the
stack (e.g., via debug.Stack()), truncate if very large, and include it in the
formatted error assigned to job.err (so job.err contains both the panic value
and a short stack trace). Update compressJob’s defer recovery block to build the
combined message and assign it to job.err instead of only calling
rdebug.PrintStack().
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 32d48d40-e809-431b-8db3-488c4e1eac6d
📒 Files selected for processing (10)
zstd/enc_base.gozstd/enc_best.gozstd/enc_better.gozstd/enc_dfast.gozstd/enc_fast.gozstd/enc_jobs.gozstd/enc_jobs_test.gozstd/encoder.gozstd/encoder_options.gozstd/encoder_test.go
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@zstd/encoder.go`:
- Around line 569-580: The flushJobs helper (Encoder.flushJobs) currently only
returns jobs.flusherErr and can mask a previously stored stream error
(e.state.err); update flushJobs to, after waitAllJobs and before returning,
check e.state.err and return that error if non-nil (prefer the earlier stream
error over flusherErr), and apply the same change pattern to the other helper(s)
referenced around lines 652-704 (the Close/Flush finalization helpers) so they
honor e.state.err in addition to jobs.flusherErr; use the existing symbols
e.dispatchJob, e.waitAllJobs, js.mu and js.flusherErr to locate and adjust the
logic.
- Around line 598-600: The Close path for concurrent mode is checked after a
guard that wrongly inspects s.filling rather than the concurrent buffer
s.jobs.filling, causing Reset(nil) + Write + Close to incorrectly take the
nil-writer early-return; modify the Close implementation so the concurrent
branch (use e.closeJobs()) is executed before the no-writer check or update the
no-writer check to also examine s.jobs.filling (and any other concurrent
buffers) so that pending bytes in s.jobs.filling are recognized and the function
returns the proper "encoder has no writer" error when appropriate; locate
symbols Close, e.closeJobs, s.filling, s.jobs.filling, Reset, and Write to apply
the change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 5fb36ce3-fb2b-43ee-be84-a264794d6168
📒 Files selected for processing (4)
zstd/README.mdzstd/enc_jobs_test.gozstd/encoder.gozstd/encoder_options.go
🚧 Files skipped from review as they are similar to previous changes (1)
- zstd/encoder_options.go
Close() checked s.w == nil before the e.o.concurrentBlocks branch. In concurrent mode, Write fills js.filling (not s.filling), so the guard len(s.filling) == 0 && s.nInput == 0 would incorrectly return nil even with pending data. Fix: moved the concurrentBlocks branch before the nil-writer check, and added a matching nil-writer guard inside closeJobs() that checks js.filling instead of s.filling.
|
Main constraint for scaling is CPU cache thrashing, basically hash tables can no longer stay in L3 cache. Halving the "better" table size:
|
Adds true concurrent stream encoding:
Example:
Summary by CodeRabbit