Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposing a small change in the WaitStrategy #246

Closed
bogdandrutu opened this issue Nov 26, 2018 · 9 comments
Closed

Proposing a small change in the WaitStrategy #246

bogdandrutu opened this issue Nov 26, 2018 · 9 comments

Comments

@bogdandrutu
Copy link

First I would like to say congrats for implementing a very good library.

Here is what I try to do: I want to implement a strategy similar with the current TimeoutBlockingWaitStrategy that does the following (in my case I have only one consumer):

  • Process items every X ms (sleep for X ms after I process all the items). This allows me to batch events and process them in a batch (also avoids keeping the consumer thread running).
  • If while sleeping the capacity gets under Y% (e.g. less than 20%) signal the thread to wake-up otherwise leave the thread to sleep (avoid blocking producers).

In other words I want to batch for X ms all the events but try to avoid blocking the producer threads by signaling the consumer thread as soon as the capacity is low enough and blocking can happen.

Currently the WaitStrategy interface has a "signalAllWhenBlocking" method that is called every time when the cursor advance in the publish method. It will be good to have another method on the WaitStrategy "signalAllWithRemainingSize(remainingSize)" which gets called when next is called.

If I am asking for something that Disruptor does not want to offer let me know, I am happy to analyze other possibilities, or if I can achieve these in a different way I am happy to hear other options.

I am using disruptor in the opencensus library https://opencensus.io/. Thanks for your answer in advance.

@mikeb01
Copy link
Contributor

mikeb01 commented Nov 27, 2018

I'll have a dig into the best way to do this. I think it would be reasonably easy to do with a custom wait strategy without changing the Disruptor itself. One question though, do you want to know the real capacity (or very close to it)? Because, at the moment the ring buffer will cache the consumer sequence and only re-fetch the latest value when it wraps the cached value. This is an important optimisation, which I don't really want to lose.

@mikeb01
Copy link
Contributor

mikeb01 commented Nov 27, 2018

Would something like the following work for you. You can reuse the TimeoutBlockWaitStrategy by extension, but skip the typical signalling and only signal on your required threshold.

    private static class SleepToCapacityWaitStrategy extends TimeoutBlockingWaitStrategy
    {
        public SleepToCapacityWaitStrategy(final long timeout, final TimeUnit units)
        {
            super(timeout, units);
        }

        @Override
        public void signalAllWhenBlocking()
        {
        }

        public void signal()
        {
            super.signalAllWhenBlocking();
        }
    }

    public static void main(String[] args)
    {
        SleepToCapacityWaitStrategy waitStrategy = new SleepToCapacityWaitStrategy(10, MILLISECONDS);

        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
            LongEvent.FACTORY, 1024, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, waitStrategy
        );

        disruptor.handleEventsWith(new SimpleEventHander());

        RingBuffer<LongEvent> ringBuffer = disruptor.start();
        long remainingThreshold = Math.round(ringBuffer.getBufferSize() * 0.2);

        if (ringBuffer.remainingCapacity() < remainingThreshold)
        {
            waitStrategy.signal();
        }
        
        ringBuffer.publishEvent(new EventTranslator<LongEvent>()
        {
            @Override
            public void translateTo(LongEvent event, long sequence)
            {
                // Whatever publishing required.
                event.set(sequence + 10);
            }
        });
    }

@felixbarny
Copy link

A potential problem I see with this approach are situations where the consumer can't keep up so that the queue is basically always full. In that case, producers compete for the lock for every publish - even if the consumer is not waiting but busy handling events. In that case, signaling the event handler does not bring any benefits as the consumer is already "awake". Extending LiteTimeoutBlockingWaitStrategy might fix that problem as it only signals if the consumer thread is actually waiting.

@mikeb01
Copy link
Contributor

mikeb01 commented Jan 24, 2019

If the queue is full most of the time, then I think that most of the time would be spent waiting for space to become available, rather than lost signaling. If need to do some profiling to check, but I think the full queue will naturally throttle access to the mutex by the publishers.

@felixbarny
Copy link

Not if events are dropped if the queue is full. That pattern is commonly used by agents which rather drop events than stall application threads.

@mikeb01
Copy link
Contributor

mikeb01 commented Feb 1, 2019

True, in that case you might want to move the signal call after the attempt to publish and only signal if successful. Using the LiteBlockingWaitStrategy is another idea. I haven't managed to convince myself it that it is completely correct. I've not done heavy stress testing of it yet - hence it is marked as experimental.

@felixbarny
Copy link

Have you had a look at this approach: census-instrumentation/opencensus-java#1618?

I find it very interesting. However, it currently only works if there is just one consumer. If there was a thread-safe list of threads which register themselves when waiting and unregister when done waiting, that approach could also work for the general use case. It could also be combined with periodic wake-ups and checking the capacity before signaling.

@felixbarny
Copy link

Having very CPU friendly wait strategies is crucial for these kinds of observability use cases. Losing some events and latency spikes are not a big deal as long as the producer is not blocked.

@mikeb01
Copy link
Contributor

mikeb01 commented Feb 6, 2019

I had a look at the census approach and it looks very similar to the LiteBlockingWaitStrategy, except that it goes directly to LockSuppport.{park,unpark} rather than using j.u.l.Lock or synchronized blocks. I would expect its load profile to be the same.

I'm not against these solutions, however we did have a similar implementation as an optimisation of the blocking wait strategy contributed once and it had a bug meaning that the wake up was missed resulting in a latency spike 6,000 times larger than our mean for that service (6s vs 1ms). It has made me nervous of implementations that try to elide the wake ups, because it is tricky to get right. So ensure that you are using them at your own risk.

That said for the observability case, then it may be the most appropriate. In our systems for these use cases we take an absolute view with regards to blocking the producer, we want it to never happen, so we use a sleeping based strategy and pay the additional CPU cost for that. We find that most of our event flow is very bursty. We we get a big increase of traffic for a period of time due to some external factor, queues will fill up, then the flow will drop off and the consumers will catch up leaving the queues empty. Then another burst will arrive. With something like the LiteBlockingWaitStrategy the unpark cost will be paid right at the front of the incoming burst of traffic potentially delaying everything behind it. While it will be faster than the normal BlockingWaitStrategy the signalling cost will hit at the point when it is least desirable. Even the threshold based implementation will probably signal occasionally. Mostly likely when a very large burst of activity happens on the system, which probably the time you least want it. This pushes us further toward the sleeping approach. However on your own system YMMV.

@mikeb01 mikeb01 closed this as completed Feb 16, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants