Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace List with Pairing Heap in Task Queue #584

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
349 changes: 349 additions & 0 deletions src/internal_modules/roc_core/pairing_heap.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,349 @@
/*
* Copyright (c) 2015 Roc Streaming authors
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: please update copyrights in newly added files to 2023.

*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

//! @file roc_core/pairing_heap.h
//! @brief Intrusive pairing heap.

#ifndef ROC_CORE_PAIRING_HEAP_H_
#define ROC_CORE_PAIRING_HEAP_H_

#include "roc_core/noncopyable.h"
#include "roc_core/ownership_policy.h"
#include "roc_core/pairing_heap_node.h"
#include "roc_core/panic.h"
#include "roc_core/stddefs.h"

namespace roc {
namespace core {

//! Intrusive pairing heap.
//!
//! Does not perform allocations.
//! Provides O(1) size check, membership check, insertion, and removal.
//!
//! @tparam T defines object type, it should inherit PairingHeapNode.
//!
//! @tparam OwnershipPolicy defines ownership policy which is used to acquire an
//! element ownership when it's added to the pairing heap and release ownership when it's
//! removed from the pairing heap.
template <class T, template <class TT> class OwnershipPolicy = RefCountedOwnership>
class PairingHeap : public NonCopyable<> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interface of this class is quite low-level as it reflects the underlying algorithm.

I suggest to use different approach: provide much more narrow interface which is actually needed for the user.

What we need in roc_ctl is a classic priority queue with just three operations:

  • push (insert element according to its priority)
  • top (look up element with highest priority)
  • pop (remove element with highest priority)

So I suggest to rename this class to PriorityQueue and hide all internals behind these high-level operations.

Benefits:

  • interface will be much simpler to understand and use, and there will be less bugs in usage
  • interface will depend on algorithm properties, but not on specific algorithm; we will state that it has cheap push and more expensive pop, but we won't attach caller to specific data structure and can change it if needed

To achieve this, we'll need to tell PriorityQueue how it can get priority of T. We can use the same approach as we use in hashmap: T will have to implement necessary methods, for example:

   // get object priority
   Prio prio() const;

   // compare two priorities (return -1, 0, or 1)
   static int prio_cmp(Prio prio1, Prio prio2);

(See also comments in Hashmap).

I think it would be pretty convenient, since this is intrusive data structure, and we anyway place special requirements on elements (inherit from node class).

What do you think?

Copy link
Author

@Hassan-A Hassan-A Oct 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gavv I will try! 😅

questions:

* pop (remove element with highest priority) 

Are we assuming we never remove / change deadline of elements other than the top one? Since there is ControlTaskQueue::cancel_task_ function, I wasn't too sure.

 static int prio_cmp(Prio prio1, Prio prio2);

If this returns 0 I'm assuming priority is equal. Do we need to make sure which task goes first? or can we do either one. This came up because when I was testing with:
TEST(task_queue, schedule_at_same_deadline) {...}
it assumes the task that was scheduled first would be in front of the other one. However I don't think we can guarantee this using a heap because of how heap merge works.

Example of benchmarks for control task queue is here: https://github.com/roc-streaming/roc-toolkit/blob/master/src/tests/roc_ctl/bench_task_queue_contention.cpp

my contention test freezes here:
contention
is it just my side?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good questions,

Are we assuming we never remove / change deadline of elements other than the top one? Since there is ControlTaskQueue::cancel_task_ function, I wasn't too sure.

My bad, we indeed need to remove tasks. The interface then can be:

  • void push(T&)
  • T* top()
  • void pop()
  • bool contains(T&)
  • void remove(T&)

BTW, it would be better to use naming similar to List and Hashmap:

  • void insert(T&)
  • T* front()
  • void pop_front()
  • bool contains(T&)
  • void remove(T&)

If this returns 0 I'm assuming priority is equal.

Yep, like in strcmp() and memcmp(). Another option is to make it static bool prio_less(Prio prio1, Prio prio2) and return true if prio1 < prio2. Choose whatever you think fits better.

Do we need to make sure which task goes first? or can we do either one.

I think it would be useful to have this guarantee, because it will prevent some easy to make bugs in user code.

Imagine there is a code that calculates next deadline for some event (e.g. periodic timer), and then runs some logic that schedules a few tasks for that deadline, like:

next_deadline_ = ...;

...

if (need_x) {
   schedule_at(next_deadline_, task_x);

   ...

   if (also_need_y) {
      schedule_at(next_deadline_, task_y);
   }
}

It would be easy to forget that order of task_x and task_y is not guaranteed, especially given that it's guaranteed for regular schedule (without deadline).

However I don't think we can guarantee this using a heap because of how heap merge works.

But how do you handle it currently? I guess it works somehow because tests pass on this PR?

Also, if needed, we could upgrade PairingHeap (PriorityQueue) so that each node will contain a list of nodes with the same priority. When you add node, and another node with same prio already exists, it will be added to that list. And the list will maintain insertion order.

my contention test freezes here:

This indicates a bug in benchmarks I think. They work on my machine and a few others where I tested them. (BTW they take quite a lot of time).

It would be great if you can debug the reason of the freeze because I can't reproduce it. Anyway it should be done separately from this PR I think.

public:
//! Pointer type.
//! @remarks
//! either raw or smart pointer depending on the ownership policy.
typedef typename OwnershipPolicy<T>::Pointer Pointer;

//! Initialize empty pairing heap.
PairingHeap()
: size_(0) {
root_.leftmost_child = &root_;
root_.prev = &root_;
root_.next = &root_;
root_.pairing_heap = this;
}

//! Parse through pairing heap to release ownership of containing objects.
~PairingHeap() {
if ((root_.leftmost_child != &root_ && root_.leftmost_child != NULL)
|| size_ > 0) {
ReleasePairingHeapNode(root_.leftmost_child);
}

root_.pairing_heap = NULL;
}

//! Release ownership of containing objects.
void ReleasePairingHeapNode(PairingHeapNode::PairingHeapNodeData* data) {
roc_panic_if(data == NULL);
check_is_member_(data, this);

if (data->leftmost_child != NULL) {
ReleasePairingHeapNode(data->leftmost_child);
}

if (data->next != NULL) {
ReleasePairingHeapNode(data->next);
}

data->pairing_heap = NULL;

OwnershipPolicy<T>::release(*container_of_(data));
}

//! Get number of elements in pairing heap.
size_t size() const {
return size_;
}

//! Check if element belongs to pairing heap.
bool contains(const T& element) {
const PairingHeapNode::PairingHeapNodeData* data =
element.pairing_heap_node_data();
return (data->pairing_heap == this);
}

//! Get first pairing heap element.
//! @returns
//! first element or NULL if pairing heap is empty.
Pointer top() const {
if (size_ == 0) {
return NULL;
}
return container_of_(root_.leftmost_child);
}

//! Get pairing heap element next to given one.
//!
//! @returns
//! pairing heap element following @p element if @p element is not
//! the last sibling, or NULL otherwise.
//!
//! @pre
//! @p element should be member of this pairing heap.
Pointer next_sibling_of(T& element) const {
PairingHeapNode::PairingHeapNodeData* data = element.pairing_heap_node_data();
check_is_member_(data, this);

if (data->next == NULL) {
return NULL;
}
return container_of_(data->next);
}

//! Get pairing heap element previous to given one.
//!
//! @returns
//! pairing heap element before @p element if @p element has a previous element, or
//! NULL otherwise.
//!
//! @pre
//! @p element should be member of this pairing heap.
Pointer prev_sibling_of(T& element) const {
PairingHeapNode::PairingHeapNodeData* data = element.pairing_heap_node_data();
check_is_member_(data, this);

if (data->prev == NULL) {
return NULL;
}
return container_of_(data->prev);
}

//! Get pairing heap element child of given one.
//!
//! @returns
//! pairing heap element child of @p element if @p element has a child element, or
//! NULL otherwise.
//!
//! @pre
//! @p element should be member of this pairing heap.
Pointer child_of(T& element) const {
PairingHeapNode::PairingHeapNodeData* data = element.pairing_heap_node_data();
check_is_member_(data, this);

if (data->leftmost_child == NULL) {
return NULL;
}
return container_of_(data->leftmost_child);
}

//! Inserts first element to pairing heap.
//!
//! @remarks
//! - appends @p element to pairing heap
//! - acquires ownership of @p element
//!
//! @pre
//! @p element should not be member of any pairing heap and should be the first
//! element in the heap.
void push(T& element) {
insert_as_child(element, NULL);
}

//! Insert element into pairing heap as a child of an existing element.
//!
//! @remarks
//! - inserts @p new_child as a child of @p parent element
//! - acquires ownership of @p new_child
//!
//! @pre
//! @p new_child should not be member of any pairing heap.
//! @p parent should be member of this pairing heap.
void push_as_child(T& new_child, T& parent) {
insert_as_child(new_child, &parent);
}

//! Insert element into pairing heap as a parent of an existing element.
//!
//! @remarks
//! - inserts @p new_parent as parent of @p child element
//! - acquires ownership of @p new_parent
//!
//! @pre
//! @p new_parent should not be member of any pairing heap.
//! @p child should be member of this pairing heap.
void push_as_parent(T& new_parent, T& child) {
insert_as_parent(new_parent, &child);
}

//! Merge two pairing heap elements by parenting first given element to second given
//! element.
//!
//! @returns
//! the @p parent_element pairing heap element
//!
//! @pre
//! @p parent_element and @p child_element should be members of this pairing heap.
Pointer merge(T& parent_element, T& child_element) const {
PairingHeapNode::PairingHeapNodeData* parent =
parent_element.pairing_heap_node_data();
PairingHeapNode::PairingHeapNodeData* child =
child_element.pairing_heap_node_data();

check_is_member_(parent, this);
check_is_member_(child, this);

if (child->prev->leftmost_child == child) {
child->prev->leftmost_child = parent;
parent->prev = child->prev;
child->prev = parent;
} else {
if (parent->next == child) {
parent->next = child->next;
if (child->next != NULL) {
child->next->prev = parent;
}
} else {
parent->prev = child->prev;
}
}

if (parent->leftmost_child != NULL) {
parent->leftmost_child->prev = child;
}

child->next = parent->leftmost_child;
parent->leftmost_child = child;

return container_of_(parent);
}

//! Remove element from pairing heap.
//!
//! @remarks
//! - removes @p element from pairing heap
//! - releases ownership of @p element
//!
//! @pre
//! @p element should be member of this pairing heap.
void remove(T& element) {
PairingHeapNode::PairingHeapNodeData* data = element.pairing_heap_node_data();
check_is_member_(data, this);

PairingHeapNode::PairingHeapNodeData* data_child = data->leftmost_child;

if (data_child != NULL) {
data_child->prev = data->prev;
data_child->next = data->next;

if (data->prev->leftmost_child == data) {
data->prev->leftmost_child = data_child;
} else {
data->prev->next = data_child;
}
} else {
if (data->prev->leftmost_child == data) {
data->prev->leftmost_child = data->next;
} else {
if (data->prev == &root_) {
data->prev->next = &root_;
} else {
data->prev->next = data->next;
}
}
}

data->pairing_heap = NULL;

size_--;

OwnershipPolicy<T>::release(element);
}

private:
static inline T* container_of_(PairingHeapNode::PairingHeapNodeData* data) {
return static_cast<T*>(data->container_of());
}

static void check_is_member_(const PairingHeapNode::PairingHeapNodeData* data,
const PairingHeap* pairing_heap) {
if (data->pairing_heap != pairing_heap) {
roc_panic("pairing heap element is member of wrong pairing heap: expected "
"%p, got %p",
(const void*)pairing_heap, (const void*)data->pairing_heap);
}
}

void insert_as_child(T& new_child, T* parent) {
PairingHeapNode::PairingHeapNodeData* data_new =
new_child.pairing_heap_node_data();
check_is_member_(data_new, NULL);

PairingHeapNode::PairingHeapNodeData* data_parent;
if (parent != NULL) {
data_parent = parent->pairing_heap_node_data();
check_is_member_(data_parent, this);

if (data_parent->leftmost_child != NULL) {
data_new->next = data_parent->leftmost_child;
data_parent->leftmost_child->prev = data_new;
}

} else {
data_parent = &root_;
}

data_new->prev = data_parent;
data_parent->leftmost_child = data_new;

data_new->pairing_heap = this;

size_++;

OwnershipPolicy<T>::acquire(new_child);
}

void insert_as_parent(T& new_parent, T* child) {
PairingHeapNode::PairingHeapNodeData* data_new =
new_parent.pairing_heap_node_data();
check_is_member_(data_new, NULL);

PairingHeapNode::PairingHeapNodeData* data_child =
child->pairing_heap_node_data();

data_new->prev = data_child->prev;
data_new->leftmost_child = data_child;

data_child->prev->leftmost_child = data_new;
data_child->prev = data_new;

data_new->pairing_heap = this;

size_++;

OwnershipPolicy<T>::acquire(new_parent);
}

PairingHeapNode::PairingHeapNodeData root_;
size_t size_;
};

} // namespace core
} // namespace roc

#endif // ROC_CORE_PAIRING_HEAP_H_
Loading
Loading