-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proposing a small change in the WaitStrategy #246
Comments
I'll have a dig into the best way to do this. I think it would be reasonably easy to do with a custom wait strategy without changing the Disruptor itself. One question though, do you want to know the real capacity (or very close to it)? Because, at the moment the ring buffer will cache the consumer sequence and only re-fetch the latest value when it wraps the cached value. This is an important optimisation, which I don't really want to lose. |
Would something like the following work for you. You can reuse the TimeoutBlockWaitStrategy by extension, but skip the typical signalling and only signal on your required threshold. private static class SleepToCapacityWaitStrategy extends TimeoutBlockingWaitStrategy
{
public SleepToCapacityWaitStrategy(final long timeout, final TimeUnit units)
{
super(timeout, units);
}
@Override
public void signalAllWhenBlocking()
{
}
public void signal()
{
super.signalAllWhenBlocking();
}
}
public static void main(String[] args)
{
SleepToCapacityWaitStrategy waitStrategy = new SleepToCapacityWaitStrategy(10, MILLISECONDS);
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
LongEvent.FACTORY, 1024, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, waitStrategy
);
disruptor.handleEventsWith(new SimpleEventHander());
RingBuffer<LongEvent> ringBuffer = disruptor.start();
long remainingThreshold = Math.round(ringBuffer.getBufferSize() * 0.2);
if (ringBuffer.remainingCapacity() < remainingThreshold)
{
waitStrategy.signal();
}
ringBuffer.publishEvent(new EventTranslator<LongEvent>()
{
@Override
public void translateTo(LongEvent event, long sequence)
{
// Whatever publishing required.
event.set(sequence + 10);
}
});
} |
A potential problem I see with this approach are situations where the consumer can't keep up so that the queue is basically always full. In that case, producers compete for the lock for every publish - even if the consumer is not waiting but busy handling events. In that case, signaling the event handler does not bring any benefits as the consumer is already "awake". Extending |
If the queue is full most of the time, then I think that most of the time would be spent waiting for space to become available, rather than lost signaling. If need to do some profiling to check, but I think the full queue will naturally throttle access to the mutex by the publishers. |
Not if events are dropped if the queue is full. That pattern is commonly used by agents which rather drop events than stall application threads. |
True, in that case you might want to move the signal call after the attempt to publish and only signal if successful. Using the LiteBlockingWaitStrategy is another idea. I haven't managed to convince myself it that it is completely correct. I've not done heavy stress testing of it yet - hence it is marked as experimental. |
Have you had a look at this approach: census-instrumentation/opencensus-java#1618? I find it very interesting. However, it currently only works if there is just one consumer. If there was a thread-safe list of threads which register themselves when waiting and unregister when done waiting, that approach could also work for the general use case. It could also be combined with periodic wake-ups and checking the capacity before signaling. |
Having very CPU friendly wait strategies is crucial for these kinds of observability use cases. Losing some events and latency spikes are not a big deal as long as the producer is not blocked. |
I had a look at the census approach and it looks very similar to the LiteBlockingWaitStrategy, except that it goes directly to LockSuppport.{park,unpark} rather than using j.u.l.Lock or synchronized blocks. I would expect its load profile to be the same. I'm not against these solutions, however we did have a similar implementation as an optimisation of the blocking wait strategy contributed once and it had a bug meaning that the wake up was missed resulting in a latency spike 6,000 times larger than our mean for that service (6s vs 1ms). It has made me nervous of implementations that try to elide the wake ups, because it is tricky to get right. So ensure that you are using them at your own risk. That said for the observability case, then it may be the most appropriate. In our systems for these use cases we take an absolute view with regards to blocking the producer, we want it to never happen, so we use a sleeping based strategy and pay the additional CPU cost for that. We find that most of our event flow is very bursty. We we get a big increase of traffic for a period of time due to some external factor, queues will fill up, then the flow will drop off and the consumers will catch up leaving the queues empty. Then another burst will arrive. With something like the LiteBlockingWaitStrategy the unpark cost will be paid right at the front of the incoming burst of traffic potentially delaying everything behind it. While it will be faster than the normal BlockingWaitStrategy the signalling cost will hit at the point when it is least desirable. Even the threshold based implementation will probably signal occasionally. Mostly likely when a very large burst of activity happens on the system, which probably the time you least want it. This pushes us further toward the sleeping approach. However on your own system YMMV. |
First I would like to say congrats for implementing a very good library.
Here is what I try to do: I want to implement a strategy similar with the current TimeoutBlockingWaitStrategy that does the following (in my case I have only one consumer):
In other words I want to batch for X ms all the events but try to avoid blocking the producer threads by signaling the consumer thread as soon as the capacity is low enough and blocking can happen.
Currently the WaitStrategy interface has a "signalAllWhenBlocking" method that is called every time when the cursor advance in the
publish
method. It will be good to have another method on the WaitStrategy "signalAllWithRemainingSize(remainingSize)" which gets called whennext
is called.If I am asking for something that Disruptor does not want to offer let me know, I am happy to analyze other possibilities, or if I can achieve these in a different way I am happy to hear other options.
I am using disruptor in the opencensus library https://opencensus.io/. Thanks for your answer in advance.
The text was updated successfully, but these errors were encountered: