Skip to content

Conversation

@bsergean
Copy link
Contributor

@bsergean bsergean commented May 5, 2022

No description provided.

@bsergean
Copy link
Contributor Author

bsergean commented May 5, 2022

zstd might support parallel decompression, if this is the case maybe we could put the same api on the reader.

@Viq111
Copy link
Collaborator

Viq111 commented May 6, 2022

Thanks for your contribution!
Would you mind adding a test that check that the error returned is nil and maybe via ZSTD_CCtx_getParameter that the parameter is correctly set ?

@bsergean
Copy link
Contributor Author

bsergean commented May 6, 2022 via email

@bsergean
Copy link
Contributor Author

bsergean commented May 7, 2022

zstd$ go test -run TestStreamSetNbWorkers
--- FAIL: TestStreamSetNbWorkers (0.00s)
    zstd_stream_test.go:407: Expected SetNbWorkers to succeed, got Unsupported parameter instead

Interestingly my unittest started failing as is, as the parameter was seen as unsupported. This is because the library was not built in threading mode, which I changed. I don't know if that's acceptable, but it's a requirement for this PR.

@bsergean
Copy link
Contributor Author

bsergean commented May 7, 2022

I'm getting that error in the unittest in the efence build -> https://app.circleci.com/pipelines/github/DataDog/zstd/179/workflows/5f082f6d-b02f-4434-981c-9bacb0297e85/jobs/916

    zstd_stream_test.go:19: Failed writing to compress object: Unsupported parameter

If I try to run efence locally, I'm getting a panic ...

GODEBUG=efence=1 go test -run TestStreamSetNbWorkers

Copy link
Collaborator

@Viq111 Viq111 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delayed answer.

For the dynamic library not built with -DZSTD_MULTITHREAD=1, I think it's fair to assume that we may encounter it from time to time in the wild a library.
I think it's fine but we should make the error more user friendly, what about introducing an error: ErrNoParallelSupport and return it when the caller uses SetNbWorkers but the underlying library doesn't support it ?
In tests you can then do a:

if err := writer.SetNbWorkers(nbWorkers); err == ErrNoParallelSupport {
  t.Skip()
}

What do you think ?

writer := NewWriterLevelDict(&w, DefaultCompression, dict)
_, err := writer.Write(payload)

err := writer.SetNbWorkers(nbWorkers)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably only call this is nbWorkers > 1 so you can test the (most common) path where the user doesn't call this method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is addressed.

_, err := writer.Write(payload)

err := writer.SetNbWorkers(nbWorkers)
failOnError(t, "Failed writing to compress object", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned, you might want to skip the test (for nbWorkers > 1) if the error is that the library doesn't support multi-threading

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is addressed.

@bsergean
Copy link
Contributor Author

bsergean commented May 11, 2022 via email

@Viq111
Copy link
Collaborator

Viq111 commented May 11, 2022

This PR added support to either compile with the dynamic library or static library (added in the README here).
The default (no compile flags) is to build statically

@bsergean
Copy link
Contributor Author

bsergean commented May 11, 2022 via email

@bsergean
Copy link
Contributor Author

I introduced the new error type you suggested (made it global by giving it a starting capital letter).

All the tests are passing now, I wonder if there was a fluke previously, or if one env has an external library that was built without parallel support, which would be great.

Lastly, I have not tested that code very much, to measure the performance impact (I know we use multiple threads, and have to use C to do that).

Copy link
Collaborator

@Viq111 Viq111 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!
I will do some benchmarking on different hardware and let you know but PR looks good!

@bsergean
Copy link
Contributor Author

bsergean commented May 26, 2022 via email

@Viq111
Copy link
Collaborator

Viq111 commented Jun 6, 2022

edit: this comment is incorrect, see latest

I have been benchmarking with the mozilla (unbzip2 then tarred) payload (the biggest one).

On x86, the performance is always better, albeit much more variable.
Without parallelism:

goos: darwin
goarch: amd64
pkg: github.com/DataDog/zstd
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkStreamCompression-12    	      85	 392153074 ns/op	 135.01 MB/s
BenchmarkStreamCompression-12    	      82	 389099060 ns/op	 136.07 MB/s
BenchmarkStreamCompression-12    	      86	 396593782 ns/op	 133.50 MB/s
BenchmarkStreamCompression-12    	      88	 387180328 ns/op	 136.74 MB/s
BenchmarkStreamCompression-12    	      88	 388614650 ns/op	 136.24 MB/s

With parallelism = 8:

goos: darwin
goarch: amd64
pkg: github.com/DataDog/zstd
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkStreamCompression-12    	     100	 323739454 ns/op	 163.54 MB/s
BenchmarkStreamCompression-12    	     132	 260639012 ns/op	 203.13 MB/s
BenchmarkStreamCompression-12    	     130	 266204210 ns/op	 198.89 MB/s
BenchmarkStreamCompression-12    	     139	 263071863 ns/op	 201.25 MB/s
BenchmarkStreamCompression-12    	     159	 442939701 ns/op	 119.53 MB/s

Summary:

name                  old time/op   new time/op   delta
StreamCompression-12    391ms ± 2%    311ms ±42%   ~     (p=0.151 n=5+5)

name                  old speed     new speed     delta
StreamCompression-12  136MB/s ± 1%  177MB/s ±33%   ~     (p=0.151 n=5+5)

Howerver on ARM (testing on a c7g.4xlarge), with parellization=8 on 16 cores, I unfortunately get OOMs so looking into it:

goos: linux
goarch: arm64
pkg: github.com/DataDog/zstd
BenchmarkStreamCompression-16    	Killed

@bsergean
Copy link
Contributor Author

bsergean commented Jun 6, 2022 via email

@Viq111
Copy link
Collaborator

Viq111 commented Jun 6, 2022

Yeah same expectation here but the mozilla payload is ~50Mb and with 8 concurrent threads, I would not expect it to starve the 32GiB of memory a c7g.4xlargehave

@bsergean
Copy link
Contributor Author

bsergean commented Jun 6, 2022 via email

@Viq111
Copy link
Collaborator

Viq111 commented Jun 6, 2022

👍 sounds good, here are my current testing steps:

Add w.SetNbWorkers(8) to the benchmark

then

wget https://sun.aei.polsl.pl//~sdeor/corpus/mozilla.bz2
tar -xf mozilla.bz2 mozilla
tar cf mozilla.tar mozilla
export PAYLOAD=`pwd`/mozilla.tar
go test -c
./zstd.test -test.run None -test.bench BenchmarkStreamCompression -test.benchtime 30s -test.count 5

On a c7.4xlarge, you can see it uses all (32Gib) of the memory:

$ grep ^VmPeak /proc/$(pidof zstd.test)/status
VmPeak:	33758272 kB

@bsergean
Copy link
Contributor Author

bsergean commented Jun 6, 2022 via email

@bsergean
Copy link
Contributor Author

bsergean commented Jun 6, 2022 via email

@Viq111
Copy link
Collaborator

Viq111 commented Jun 6, 2022

Sorry for my previous message, on a c6i.4xlarge (x86) I have the same consumption:

grep ^VmPeak /proc/$(pidof zstd.test)/status
VmPeak:	33356928 kB

so going to look on the side of the C code indeed

@Viq111
Copy link
Collaborator

Viq111 commented Jun 6, 2022

Ok I think I found the "issue".
See 9dd8a8a
A gotcha was when setting nbWorkers >1, the compress C calls becomes asynchronous instead of synchronous. That means with Go benchmark trying to do benchmarks with an increasing N number, we would buffer a lot of data on the C side.

By doing 9dd8a8a and forcing a flush every 1GiB, we force the C zstd buffer to not have more than 1GiB at a time.

This works:

grep ^VmPeak /proc/12092/status                                                                                                                                                                                           Mon Jun  6 18:54:05 2022

VmPeak:  8307060 kB

Memory usage is 8GiB which is exactly 1GiB for 8 workers.

So there is nothing wrong with the code, just the benchmark did not take the async into account.

The good news is the performance with 8 workers is much better than 1 worker:

name                  old time/op   new time/op   delta
StreamCompression-16    414ms ± 1%     98ms ± 2%   -76.37%  (p=0.008 n=5+5)

name                  old speed     new speed     delta
StreamCompression-16  124MB/s ± 1%  524MB/s ± 2%  +323.25%  (p=0.008 n=5+5)

Maybe just add a comment to SetNbWorkers to warn about this behavior ?

@bsergean
Copy link
Contributor Author

bsergean commented Jun 6, 2022 via email

@bsergean
Copy link
Contributor Author

bsergean commented Jun 6, 2022 via email

@Viq111
Copy link
Collaborator

Viq111 commented Jun 6, 2022

Sorry, the one above is actually a benchmark on x86 (editing my message to reflect) it:

x86 (c6i.4xlarge):

name                  old time/op   new time/op   delta
StreamCompression-16    414ms ± 1%     98ms ± 2%   -76.37%  (p=0.008 n=5+5)

name                  old speed     new speed     delta
StreamCompression-16  124MB/s ± 1%  524MB/s ± 2%  +323.25%  (p=0.008 n=5+5)

so 3.2x increase (for 8 workers)

ARM (c7g.4xlarge):

name                  old time/op   new time/op   delta
StreamCompression-16    513ms ± 1%     74ms ± 1%   -85.61%  (p=0.008 n=5+5)

name                  old speed     new speed     delta
StreamCompression-16  100MB/s ± 1%  694MB/s ± 1%  +594.94%  (p=0.008 n=5+5)

So ~6x increase (for 8 workers)

So this is not a behavior only on ARM but that calls to write.Write(...) actually becomes async.
So maybe add a comment of the sort:

// Set the number of workers to run the compression in parallel using multiple threads
// If > 1, the Write() call will become asynchronous. This means data will be buffered until processed.
// If you call Write() too fast, you might incur a memory buffer up to as large as your input.
// Consider calling Flush() periodically if you need to compress a very large file that would not fit all in memory.

@bsergean
Copy link
Contributor Author

bsergean commented Jun 6, 2022 via email

@bsergean
Copy link
Contributor Author

bsergean commented Jun 6, 2022 via email

@bsergean
Copy link
Contributor Author

bsergean commented Jun 6, 2022 via email

@Viq111
Copy link
Collaborator

Viq111 commented Jun 6, 2022

I would leave the API as is because it gives the user a choice to call Flush whenever. As a headsup, calling Flush() makes the payload a bit bigger as you are forcing zstd to flush as much data as possible.
For your code, you need to ensure either f is smaller enough to fit in memory (the pathological case of the benchmark is it was generating massive files) OR you can create an intermediate io.Write (similar to LimitedReader) that periodically calls flush but I think that should live outside of the zstd package

Copy link
Collaborator

@Viq111 Viq111 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bearing with me as we benchmarked!
The PR as it stands looks good to me and if you are fine with the API, we can merge in its current state

@bsergean
Copy link
Contributor Author

bsergean commented Jun 6, 2022 via email

@Viq111 Viq111 merged commit fd035e5 into DataDog:1.x Jun 6, 2022
kodiakhq bot referenced this pull request in cloudquery/cloudquery Jan 1, 2023
This PR contains the following updates:

| Package | Type | Update | Change |
|---|---|---|---|
| [github.com/DataDog/zstd](https://togithub.com/DataDog/zstd) | indirect | patch | `v1.5.0` -> `v1.5.2` |

---

### Release Notes

<details>
<summary>DataDog/zstd</summary>

### [`v1.5.2`](https://togithub.com/DataDog/zstd/releases/tag/v1.5.2): zstd 1.5.2

[Compare Source](https://togithub.com/DataDog/zstd/compare/v1.5.2...v1.5.2)

This release updates the upstream zstd version to [1.5.2](https://togithub.com/facebook/zstd/releases/tag/v1.5.2) ([https://github.com/DataDog/zstd/pull/116](https://togithub.com/DataDog/zstd/pull/116))

The update `1.5.0` -> `1.5.2` overall has a similar performance profile. Please note that depending on the workload, performance could vary by -10% / +10%

### [`v1.5.2+patch1`](https://togithub.com/DataDog/zstd/releases/tag/v1.5.2%2Bpatch1): zstd 1.5.2 - wrapper patches 1

[Compare Source](https://togithub.com/DataDog/zstd/compare/v1.5.0...v1.5.2)

#### What's Changed

-   Fix unneededly allocated large decompression buffer by [@&#8203;XiaochenCui](https://togithub.com/XiaochenCui) ([#&#8203;118](https://togithub.com/DataDog/zstd/issues/118)) & [@&#8203;Viq111](https://togithub.com/Viq111) in [https://github.com/DataDog/zstd/pull/120](https://togithub.com/DataDog/zstd/pull/120)
-   Add SetNbWorkers api to the writer code (see [#&#8203;108](https://togithub.com/DataDog/zstd/issues/108)) by [@&#8203;bsergean](https://togithub.com/bsergean) in [https://github.com/DataDog/zstd/pull/117](https://togithub.com/DataDog/zstd/pull/117)
    -   For large workloads, the performance can be improved by 3-6x (see [https://github.com/DataDog/zstd/pull/117#issuecomment-1147812767](https://togithub.com/DataDog/zstd/pull/117#issuecomment-1147812767))
    -   `Write()` becomes async with workers > 1, make sure you read the method documentation before using

#### New Contributors

-   [@&#8203;bsergean](https://togithub.com/bsergean) made their first contribution in [https://github.com/DataDog/zstd/pull/117](https://togithub.com/DataDog/zstd/pull/117)
-   [@&#8203;XiaochenCui](https://togithub.com/XiaochenCui) for his work on [https://github.com/DataDog/zstd/pull/118](https://togithub.com/DataDog/zstd/pull/118) that led to [#&#8203;120](https://togithub.com/DataDog/zstd/issues/120)

**Full Changelog**: DataDog/zstd@v1.5.2...v1.5.2+patch1

</details>

---

### Configuration

📅 **Schedule**: Branch creation - "before 3am on the first day of the month" (UTC), Automerge - At any time (no schedule defined).

🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied.

♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox.

🔕 **Ignore**: Close this PR and you won't be reminded about this update again.

---

 - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, check this box

---

This PR has been generated by [Renovate Bot](https://togithub.com/renovatebot/renovate).
<!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiIzNC43Ny4wIiwidXBkYXRlZEluVmVyIjoiMzQuNzcuMCJ9-->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants