Skip to content

Commit

Permalink
Revert "epoll: fix up lock ordering issues"
Browse files Browse the repository at this point in the history
This reverts commit 4eb729e. Fixes cloudius-systems#448.

Signed-off-by: Pekka Enberg <penberg@cloudius-systems.com>
  • Loading branch information
Pekka Enberg committed Aug 7, 2014
1 parent 3846204 commit 408a982
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 122 deletions.
14 changes: 8 additions & 6 deletions bsd/sys/kern/sys_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,29 +274,31 @@ socket_file::poll_sync(struct pollfd& pfd, timeout_t timeout)
return !!revents;
}

void socket_file::epoll_add(epoll_ptr ep)
void socket_file::epoll_add()
{
SOCK_LOCK(so);
file::epoll_add(ep);
assert(!f_lock.owned());
so->so_rcv.sb_flags |= SB_SEL;
so->so_snd.sb_flags |= SB_SEL;
WITH_LOCK(f_lock) {
if (so->so_nc) {
so->so_nc->add_epoll(ep);
for (auto&& ep : *f_epolls) {
so->so_nc->add_epoll(ep);
}
}
}
SOCK_UNLOCK(so);
}

void socket_file::epoll_del(epoll_ptr ep)
void socket_file::epoll_del()
{
SOCK_LOCK(so);
assert(!f_lock.owned());
file::epoll_del(ep);
WITH_LOCK(f_lock) {
if (so->so_nc) {
so->so_nc->del_epoll(ep);
for (auto&& ep : *f_epolls) {
so->so_nc->del_epoll(ep);
}
}
}
SOCK_UNLOCK(so);
Expand Down
180 changes: 91 additions & 89 deletions core/epoll.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,7 @@ struct registered_epoll : epoll_event {
};

class epoll_file final : public special_file {

// lock ordering (fp == some file being polled):
// f_lock > fp->f_lock
// fp->f_lock > _activity_lock
// we never hold both f_lock and activity_lock.

// protected by f_lock:
std::unordered_map<epoll_key, registered_epoll> map;
mutex _activity_lock;
// below, all protected by _activity_lock:
std::unordered_set<epoll_key> _activity;
waitqueue _waiters;
ring_spsc<epoll_key, 256> _activity_ring;
Expand All @@ -87,26 +78,30 @@ class epoll_file final : public special_file {
{
}
virtual int close() override {
WITH_LOCK(f_lock) {
for (auto& e : map) {
e.first._file->epoll_del({ this, e.first });
}
for (auto& e : map) {
remove_me(e.first);
}
return 0;
}
int add(epoll_key key, struct epoll_event *event)
{
auto fp = key._file;
WITH_LOCK(f_lock) {
if (map.count(key)) {
return EEXIST;
WITH_LOCK(fp->f_lock) {
WITH_LOCK(f_lock) {
if (map.count(key)) {
return EEXIST;
}
// I used poll_wake_count-1, to ensure EPOLLET returns once when
// registering an epoll after data is already available.
map.emplace(key,
registered_epoll(*event, fp->poll_wake_count - 1));
if (!fp->f_epolls) {
fp->f_epolls.reset(new std::vector<epoll_ptr>);
}
fp->f_epolls->push_back(epoll_ptr{this, key});
}
// I used poll_wake_count-1, to ensure EPOLLET returns once when
// registering an epoll after data is already available.
map.emplace(key,
registered_epoll(*event, fp->poll_wake_count - 1));
fp->epoll_add({ this, key});
}
fp->epoll_add();
if (fp->poll(events_epoll_to_poll(event->events))) {
wake(key);
}
Expand All @@ -115,28 +110,30 @@ class epoll_file final : public special_file {
int mod(epoll_key key, struct epoll_event *event)
{
auto fp = key._file;
WITH_LOCK(f_lock) {
try {
map.at(key) = registered_epoll(*event, fp->poll_wake_count - 1);
} catch (std::out_of_range &e) {
return ENOENT;
WITH_LOCK(fp->f_lock) {
WITH_LOCK(f_lock) {
try {
map.at(key) = registered_epoll(*event, fp->poll_wake_count - 1);
} catch (std::out_of_range &e) {
return ENOENT;
}
}
fp->epoll_add({ this, key });
}
fp->epoll_add();
if (fp->poll(events_epoll_to_poll(event->events))) {
wake(key);
}
return 0;
}
int del(epoll_key key)
{
WITH_LOCK(f_lock) {
if (map.erase(key)) {
key._file->epoll_del({ this, key });
return 0;
} else {
return ENOENT;
}
std::unique_lock<mutex> lock(f_lock);
if (map.erase(key)) {
lock.unlock();
remove_me(key);
return 0;
} else {
return ENOENT;
}
}
int wait(struct epoll_event *events, int maxevents, int timeout_ms)
Expand All @@ -147,11 +144,11 @@ class epoll_file final : public special_file {
tmr.set(*tmo);
}
int nr = 0;
WITH_LOCK(_activity_lock) {
WITH_LOCK(f_lock) {
while (!tmr.expired() && nr == 0) {
if (tmo) {
_activity_ring_owner.reset(*sched::thread::current());
sched::thread::wait_for(_activity_lock,
sched::thread::wait_for(f_lock,
_waiters,
tmr,
[&] { return !_activity.empty(); },
Expand All @@ -163,12 +160,50 @@ class epoll_file final : public special_file {

flush_activity_ring();
// We need to drop f_lock file calling file::poll(), so move _activity to
// local storage for processing. Since _activity_mutex is internal to
// f_lock, we need to drop it as well.
// local storage for processing
auto activity = std::move(_activity);
assert(_activity.empty());
DROP_LOCK(_activity_lock) {
nr = process_activity(activity, events, maxevents);
auto i = activity.begin();
while (i != activity.end() && nr < maxevents) {
epoll_key key = *i;
auto found = map.find(key);
auto cur = i++;
if (found == map.end()) {
activity.erase(cur);
continue; // raced
}
registered_epoll r_e = found->second;
int active = 0;
if (r_e.events) {
DROP_LOCK(f_lock) {
active = key._file->poll(events_epoll_to_poll(r_e.events));
}
}
active = events_poll_to_epoll(active);
if (!active || (r_e.events & EPOLLET)) {
activity.erase(cur);
} else {
DROP_LOCK(f_lock) {
key._file->epoll_add();
}
}
if (!active) {
continue;
}
if (r_e.events & EPOLLONESHOT) {
// since we dropped the lock, the key may not be there anymore
auto i = map.find(key);
if (i != map.end()) {
i->second.events = 0;
DROP_LOCK(f_lock) {
key._file->epoll_del();
}
}
}
trace_epoll_ready(key._fd, key._file, active);
events[nr].data = r_e.data;
events[nr].events = active;
++nr;
}
// move back !EPOLLET back to main storage
if (_activity.empty()) {
Expand All @@ -179,52 +214,7 @@ class epoll_file final : public special_file {
std::move(activity.begin(), activity.end(),
std::inserter(_activity, _activity.begin()));
}
if (!tmo) {
break;
}
}
}
return nr;
}
int process_activity(std::unordered_set<epoll_key>& activity,
epoll_event* events, int maxevents) {
int nr = 0;
WITH_LOCK(f_lock) {
auto i = activity.begin();
while (i != activity.end() && nr < maxevents) {
epoll_key key = *i;
auto found = map.find(key);
auto cur = i++;
if (found == map.end()) {
activity.erase(cur);
continue; // raced
}
registered_epoll r_e = found->second;
int active = 0;
if (r_e.events) {
active = key._file->poll(events_epoll_to_poll(r_e.events));
}
active = events_poll_to_epoll(active);
if (!active || (r_e.events & EPOLLET)) {
activity.erase(cur);
} else {
key._file->epoll_add({ this, key });
}
if (!active) {
continue;
}
if (r_e.events & EPOLLONESHOT) {
// since we dropped the lock, the key may not be there anymore
auto i = map.find(key);
if (i != map.end()) {
i->second.events = 0;
key._file->epoll_del({ this, key });
}
}
trace_epoll_ready(key._fd, key._file, active);
events[nr].data = r_e.data;
events[nr].events = active;
++nr;
return nr;
}
}
return nr;
Expand All @@ -243,13 +233,13 @@ class epoll_file final : public special_file {
// events on _activity_ring only wake up one waiter, so wake up all the rest now.
// we need to do this even if no events were received, since if we exit, then
// _activity_ring_owner will remain unset.
_waiters.wake_all(_activity_lock);
_waiters.wake_all(f_lock);
}
void wake(epoll_key key) {
WITH_LOCK(_activity_lock) {
WITH_LOCK(f_lock) {
auto ins = _activity.insert(key);
if (ins.second) {
_waiters.wake_all(_activity_lock);
_waiters.wake_all(f_lock);
}
}
}
Expand All @@ -259,6 +249,18 @@ class epoll_file final : public special_file {
}
_activity_ring_owner.wake();
}
private:
void remove_me(epoll_key key) {
auto fp = key._file;
WITH_LOCK(fp->f_lock) {
epoll_ptr ptr{this, key};
auto i = boost::range::find(*fp->f_epolls, ptr);
// may race with a concurrent remove_me(), since we're not holding f_lock:
if (i != fp->f_epolls->end()) {
fp->f_epolls->erase(i);
}
}
}
};

int epoll_create(int size)
Expand Down
23 changes: 0 additions & 23 deletions fs/vfs/kern_descrip.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#include <osv/debug.h>
#include <osv/mutex.h>
#include <osv/rcu.hh>
#include <boost/range/algorithm/find.hpp>

#include <bsd/sys/sys/queue.h>

Expand Down Expand Up @@ -218,28 +217,6 @@ void file::stop_polls()
}
}

void file::epoll_add(epoll_ptr ep)
{
WITH_LOCK(f_lock) {
if (!f_epolls) {
f_epolls.reset(new std::vector<epoll_ptr>);
}
if (boost::range::find(*f_epolls, ep) == f_epolls->end()) {
f_epolls->push_back(ep);
}
}
}

void file::epoll_del(epoll_ptr ep)
{
WITH_LOCK(f_lock) {
assert(f_epolls);
auto i = boost::range::find(*f_epolls, ep);
assert(i != f_epolls->end());
f_epolls->erase(i);
}
}

dentry* file_dentry(file* fp)
{
return fp->f_dentry.get();
Expand Down
4 changes: 2 additions & 2 deletions include/osv/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ struct file {
virtual int stat(struct stat* buf) = 0;
virtual int close() = 0;
virtual int chmod(mode_t mode) = 0;
virtual void epoll_add(epoll_ptr ep);
virtual void epoll_del(epoll_ptr ep);
virtual void epoll_add() {}
virtual void epoll_del() {}
virtual void poll_install(pollreq& pr) {}
virtual void poll_uninstall(pollreq& pr) {}
virtual std::unique_ptr<mmu::file_vma> mmap(addr_range range, unsigned flags, unsigned perm, off_t offset) {
Expand Down
4 changes: 2 additions & 2 deletions include/osv/socket.hh
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public:
virtual int stat(struct stat* buf) override;
virtual int close() override;
virtual int chmod(mode_t mode) override;
virtual void epoll_add(epoll_ptr ep) override;
virtual void epoll_del(epoll_ptr ep) override;
virtual void epoll_add() override;
virtual void epoll_del() override;
virtual void poll_install(pollreq& pr) override;
virtual void poll_uninstall(pollreq& pr) override;
int bsd_ioctl(u_long cmd, void* data);
Expand Down

0 comments on commit 408a982

Please sign in to comment.