-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add buffering wrapper around WriteSyncer #782
Conversation
Codecov Report
@@ Coverage Diff @@
## master #782 +/- ##
==========================================
- Coverage 98.36% 98.25% -0.12%
==========================================
Files 43 43
Lines 2390 1943 -447
==========================================
- Hits 2351 1909 -442
+ Misses 32 27 -5
Partials 7 7
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the contributio @hnlq715.
I think there's a few issues to look into:
- This change will buffer, but not guarantee that the logger will not block on disk I/O. It's worth considering whether that's important, or the goal is just to improve performance by reducing write syscalls
- This change is prone to races. I think we need better testing that would have detected the existing races
- I think the buffer size and flush period should be configurable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want a concurrent test that writes random sized payloads and verifies that payloads are never split up across multiple writes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay, I was out for a while.
any update? |
1 similar comment
any update? |
I'm coming back……sorry for the delay |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have tests to ensure that small writes are buffered and only written to the underying Writer
periodically or after the size limit is hit?
Yes, we have test case for these situations above. |
any updates? 😃 |
Any update about this PR? |
Also, seeing as this is the first piece of non-test code in Zap that spawns a goroutine, we should use goleak to test for leaks. |
Any Updates? Appreciate all the work being done. |
Thanks for comments from @abhinav |
Looking forward to this being merged. Appreciate all the hard work @hnlq715 . @prashantv @abhinav do you have an ETA for when this will make the mainline? Appreciate it |
Any updates? |
Looking forward to get this merged as well ! |
zapcore/write_syncer.go
Outdated
// The flush interval defaults to 30 seconds if set to zero. | ||
func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (_ WriteSyncer, close func() error) { | ||
if _, ok := ws.(*bufferWriterSyncer); ok { | ||
// no need to layer on another buffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I missed this before. I think silently ignoring the call here would be
surprising. We could extract and wrap the underlying buffer, but that might be
unnecessarily messy.
We should just double wrap here. WriteSyncers should usually not be
constructed in super complicated logic where risk of unintentional double
wrapping is too high.
zapcore/write_syncer.go
Outdated
if _, ok := ws.(*bufferWriterSyncer); ok { | ||
// no need to layer on another buffer | ||
return ws, func() error { return nil } | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if _, ok := ws.(*bufferWriterSyncer); ok { | |
// no need to layer on another buffer | |
return ws, func() error { return nil } | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, we can just double wrap this buffer syncer directly.
zapcore/write_syncer_test.go
Outdated
bws.Lock() | ||
assert.Equal(t, "foofoo", buf.String(), "Unexpected log string") | ||
bws.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is the lock necessary here? there should be nothing to flush, so the buf
should not be mutated while we read this (and if it does get mutated, that's likely a real race issue to investigate)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First, bytes.Buffer is not goroutine safe.
Because we write the buf in goroutine 20, and read the buf in goroutine 19, maybe this race log says all of it.
And it should only reach race condition in this test case, because we need to check what the buf contains by buf.String()
.
And we want to avoid test case fail, so add lock in this test case just works fine :-)
Read at 0x00c000117560 by goroutine 19:
bytes.(*Buffer).String()
/home/liqi/workspace/go/src/bytes/buffer.go:65 +0x411
go.uber.org/zap/zapcore.TestBufferWriter.func6()
/home/liqi/workspace/zap/zapcore/write_syncer_test.go:134 +0x1a4
testing.tRunner()
/home/liqi/workspace/go/src/testing/testing.go:1127 +0x202
Previous write at 0x00c000117560 by goroutine 20:
bytes.(*Buffer).grow()
/home/liqi/workspace/go/src/bytes/buffer.go:128 +0x484
bytes.(*Buffer).Write()
/home/liqi/workspace/go/src/bytes/buffer.go:172 +0x184
go.uber.org/zap/zapcore.(*writerWrapper).Write()
<autogenerated>:1 +0x87
bufio.(*Writer).Flush()
/home/liqi/workspace/go/src/bufio/bufio.go:607 +0x13c
go.uber.org/zap/zapcore.(*bufferWriterSyncer).Sync()
/home/liqi/workspace/zap/zapcore/write_syncer.go:161 +0xb8
go.uber.org/zap/zapcore.(*bufferWriterSyncer).flushLoop()
/home/liqi/workspace/zap/zapcore/write_syncer.go:173 +0x87
Goroutine 19 (running) created at:
testing.(*T).Run()
/home/liqi/workspace/go/src/testing/testing.go:1178 +0x796
go.uber.org/zap/zapcore.TestBufferWriter()
/home/liqi/workspace/zap/zapcore/write_syncer_test.go:128 +0x1cc
testing.tRunner()
/home/liqi/workspace/go/src/testing/testing.go:1127 +0x202
Goroutine 20 (running) created at:
go.uber.org/zap/zapcore.Buffer()
/home/liqi/workspace/zap/zapcore/write_syncer.go:133 +0x32b
go.uber.org/zap/zapcore.TestBufferWriter.func6()
/home/liqi/workspace/zap/zapcore/write_syncer_test.go:130 +0xde
testing.tRunner()
/home/liqi/workspace/go/src/testing/testing.go:1127 +0x202
zapcore/write_syncer_test.go
Outdated
}) | ||
|
||
t.Run("flush error", func(t *testing.T) { | ||
ws, close := Buffer(AddSync(&errorWriter{}), 4, time.Nanosecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why such a short flush timer here? we're relying on the buffer size limit to flush rather than the background timer I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely, we can definitely set this timer to 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR has stalled because we need to follow up with some changes
before we merge, and have not had a chance to do that yet. Posting
this for visibility. The gist of the problem is:
There's a data race in the flush test for the buffered WriteSyncer.
Adding a lock on the underlying Buffer makes the test pass, but it
doesn't address the data race to a degree we are satisfied with.
We believe that the best way forward is to build upon the work done in
#897 and add the ability to build tickers to the Clock interface, and
with control of a fake ticker, exercise the flush path in the tests.
Note that because we're in zapcore, the Clock interface will have to be
moved to zapcore and re-exported from the top-level zap package.
Adding NewTicker
to the Clock interface is not straightforward
because time.Ticker
is a struct the internals of which we do not
have a lot of control over. So we have to decide whether the
NewTicker
method on Clock would return a *time.Ticker
or a custom
Ticker struct (similar to benbjohnson/clock)
Besides just the interface design, another thing to decide is the how
to instantiate the buffer. Previously, with the three arguments of the
Buffer function, two of which were optional, we were already pushing
it. With clock, which is also optional, we'd have 4 arguments to the
function, 3 of which can be omitted. At that point we'd want a
different way of instantiating the WriteSyncer. We considered a couple
ideas.
-
An exposed struct with unexported fields for internal state:
type BufferedWriteSyncer struct { WriteSyncer Size int FlushInterval time.Duration Clock Clock // unexported fields for state }
This would require checking if the internal state of the buffer had
been initialized on every Write call. Given that we already have
a lock on the WriteSyncer, it would be a simple boolean check so it
would not be overly expensive.Usage to wrap a WriteSyncer with the default configuration would be:
ws := &BufferedWriteSyncer{WriteSyncer: ws}
-
A configuration struct with a Wrap method:
type BufferConfig struct { Size int FlushInterval time.Duration Clock Clock } func (BufferConfig) Wrap(WriteSyncer) WriteSyncer
Usage to wrap a WriteSyncer with the default configuration would be:
ws := BufferConfig{}.Wrap(ws)
-
Functional options
type BufferOption func BufferSize(int) BufferOption func BufferFlushInterval(time.Duration) BufferOption func BufferClock(Clock) Clock func Buffer(ws WriteSyncer, opts ...BufferOption) WriteSyncer
Usage to wrap a WriteSyncer with the default configuration would be:
ws := Buffer(ws)
This would pollute the zapcore namespace too heavily so we're
disinclined to use it.
I'm personally leaning towards (1) but I'm not fully sold on it.
So in short, our problems are:
- Data race in the flush test
- Control over the source of time
- Clock interface design
- Buffer instantiation API
Unfortunately, as I mentioned before, we haven't had a chance to follow
up with the changes necessary for this, and I expect we won't get to it
for a couple more weeks at least.
Thanks for the declaration, and I personally prefer the option 1 :-) I would like to wait for your Clock interface implementation. |
Any updates? @abhinav |
Oops, I missed that. Yeah, we're working on it. #948 is the first piece of this, which will allow @moisesvega to implement the test without the data race. |
In #897, we added a Clock interface to allow control over the source of time for operations that require accessing the current time. In #782, we discovered that this interface also needs the ability to construct tickers so that we can use it for the buffered writer. This change adds NewTicker to the Clock interface for Zap and moves it to the zapcore package as it will be needed for #782. Note that since we have not yet tagged a release of Zap with #897, this is not a breaking change. Co-authored-by: Minho Park <minho.park@uber.com> Co-authored-by: Abhinav Gupta <abg@uber.com>
Here is the PR with the new proposal Buffer instantiation API and the use of the new Clock Interface to remove the data race in the test cases. |
nice work :-) |
Fixes #663
This change adds
Buffer
, which wraps aWriteSyncer
with buffering. This usebufio.Writer
to buffer in-memory, and flush changes periodically (or when the buffer is full). TheSync
method forces an immediate flush.This can improve performance by amortizing any fixed overheads of the underlying
WriteSyncer
.bechmark result