Skip to content

Commit

Permalink
Revert "Revert "epoll: fix up lock ordering issues""
Browse files Browse the repository at this point in the history
This reverts commit 408a982.

I never see the build issue mentioned by Nadav. I tested this patch with
tomcat + nio connector which uses epoll intensively and I did not see any
issue.

Signed-off-by: Asias He <[email protected]>
  • Loading branch information
asias committed Aug 28, 2014
1 parent 78691c7 commit 612f4eb
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 103 deletions.
14 changes: 6 additions & 8 deletions bsd/sys/kern/sys_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,31 +274,29 @@ socket_file::poll_sync(struct pollfd& pfd, timeout_t timeout)
return !!revents;
}

void socket_file::epoll_add()
void socket_file::epoll_add(epoll_ptr ep)
{
SOCK_LOCK(so);
file::epoll_add(ep);
assert(!f_lock.owned());
so->so_rcv.sb_flags |= SB_SEL;
so->so_snd.sb_flags |= SB_SEL;
WITH_LOCK(f_lock) {
if (so->so_nc) {
for (auto&& ep : *f_epolls) {
so->so_nc->add_epoll(ep);
}
so->so_nc->add_epoll(ep);
}
}
SOCK_UNLOCK(so);
}

void socket_file::epoll_del()
void socket_file::epoll_del(epoll_ptr ep)
{
SOCK_LOCK(so);
assert(!f_lock.owned());
file::epoll_del(ep);
WITH_LOCK(f_lock) {
if (so->so_nc) {
for (auto&& ep : *f_epolls) {
so->so_nc->del_epoll(ep);
}
so->so_nc->del_epoll(ep);
}
}
SOCK_UNLOCK(so);
Expand Down
180 changes: 89 additions & 91 deletions core/epoll.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,16 @@ struct registered_epoll : epoll_event {
};

class epoll_file final : public special_file {

// lock ordering (fp == some file being polled):
// f_lock > fp->f_lock
// fp->f_lock > _activity_lock
// we never hold both f_lock and activity_lock.

// protected by f_lock:
std::unordered_map<epoll_key, registered_epoll> map;
mutex _activity_lock;
// below, all protected by _activity_lock:
std::unordered_set<epoll_key> _activity;
waitqueue _waiters;
ring_spsc<epoll_key, 256> _activity_ring;
Expand All @@ -78,30 +87,26 @@ class epoll_file final : public special_file {
{
}
virtual int close() override {
for (auto& e : map) {
remove_me(e.first);
WITH_LOCK(f_lock) {
for (auto& e : map) {
e.first._file->epoll_del({ this, e.first });
}
}
return 0;
}
int add(epoll_key key, struct epoll_event *event)
{
auto fp = key._file;
WITH_LOCK(fp->f_lock) {
WITH_LOCK(f_lock) {
if (map.count(key)) {
return EEXIST;
}
// I used poll_wake_count-1, to ensure EPOLLET returns once when
// registering an epoll after data is already available.
map.emplace(key,
registered_epoll(*event, fp->poll_wake_count - 1));
if (!fp->f_epolls) {
fp->f_epolls.reset(new std::vector<epoll_ptr>);
}
fp->f_epolls->push_back(epoll_ptr{this, key});
WITH_LOCK(f_lock) {
if (map.count(key)) {
return EEXIST;
}
// I used poll_wake_count-1, to ensure EPOLLET returns once when
// registering an epoll after data is already available.
map.emplace(key,
registered_epoll(*event, fp->poll_wake_count - 1));
fp->epoll_add({ this, key});
}
fp->epoll_add();
if (fp->poll(events_epoll_to_poll(event->events))) {
wake(key);
}
Expand All @@ -110,30 +115,28 @@ class epoll_file final : public special_file {
int mod(epoll_key key, struct epoll_event *event)
{
auto fp = key._file;
WITH_LOCK(fp->f_lock) {
WITH_LOCK(f_lock) {
try {
map.at(key) = registered_epoll(*event, fp->poll_wake_count - 1);
} catch (std::out_of_range &e) {
return ENOENT;
}
WITH_LOCK(f_lock) {
try {
map.at(key) = registered_epoll(*event, fp->poll_wake_count - 1);
} catch (std::out_of_range &e) {
return ENOENT;
}
fp->epoll_add({ this, key });
}
fp->epoll_add();
if (fp->poll(events_epoll_to_poll(event->events))) {
wake(key);
}
return 0;
}
int del(epoll_key key)
{
std::unique_lock<mutex> lock(f_lock);
if (map.erase(key)) {
lock.unlock();
remove_me(key);
return 0;
} else {
return ENOENT;
WITH_LOCK(f_lock) {
if (map.erase(key)) {
key._file->epoll_del({ this, key });
return 0;
} else {
return ENOENT;
}
}
}
int wait(struct epoll_event *events, int maxevents, int timeout_ms)
Expand All @@ -144,11 +147,11 @@ class epoll_file final : public special_file {
tmr.set(*tmo);
}
int nr = 0;
WITH_LOCK(f_lock) {
WITH_LOCK(_activity_lock) {
while (!tmr.expired() && nr == 0) {
if (tmo) {
_activity_ring_owner.reset(*sched::thread::current());
sched::thread::wait_for(f_lock,
sched::thread::wait_for(_activity_lock,
_waiters,
tmr,
[&] { return !_activity.empty(); },
Expand All @@ -160,50 +163,12 @@ class epoll_file final : public special_file {

flush_activity_ring();
// We need to drop f_lock file calling file::poll(), so move _activity to
// local storage for processing
// local storage for processing. Since _activity_mutex is internal to
// f_lock, we need to drop it as well.
auto activity = std::move(_activity);
assert(_activity.empty());
auto i = activity.begin();
while (i != activity.end() && nr < maxevents) {
epoll_key key = *i;
auto found = map.find(key);
auto cur = i++;
if (found == map.end()) {
activity.erase(cur);
continue; // raced
}
registered_epoll r_e = found->second;
int active = 0;
if (r_e.events) {
DROP_LOCK(f_lock) {
active = key._file->poll(events_epoll_to_poll(r_e.events));
}
}
active = events_poll_to_epoll(active);
if (!active || (r_e.events & EPOLLET)) {
activity.erase(cur);
} else {
DROP_LOCK(f_lock) {
key._file->epoll_add();
}
}
if (!active) {
continue;
}
if (r_e.events & EPOLLONESHOT) {
// since we dropped the lock, the key may not be there anymore
auto i = map.find(key);
if (i != map.end()) {
i->second.events = 0;
DROP_LOCK(f_lock) {
key._file->epoll_del();
}
}
}
trace_epoll_ready(key._fd, key._file, active);
events[nr].data = r_e.data;
events[nr].events = active;
++nr;
DROP_LOCK(_activity_lock) {
nr = process_activity(activity, events, maxevents);
}
// move back !EPOLLET back to main storage
if (_activity.empty()) {
Expand All @@ -214,7 +179,52 @@ class epoll_file final : public special_file {
std::move(activity.begin(), activity.end(),
std::inserter(_activity, _activity.begin()));
}
return nr;
if (!tmo) {
break;
}
}
}
return nr;
}
int process_activity(std::unordered_set<epoll_key>& activity,
epoll_event* events, int maxevents) {
int nr = 0;
WITH_LOCK(f_lock) {
auto i = activity.begin();
while (i != activity.end() && nr < maxevents) {
epoll_key key = *i;
auto found = map.find(key);
auto cur = i++;
if (found == map.end()) {
activity.erase(cur);
continue; // raced
}
registered_epoll r_e = found->second;
int active = 0;
if (r_e.events) {
active = key._file->poll(events_epoll_to_poll(r_e.events));
}
active = events_poll_to_epoll(active);
if (!active || (r_e.events & EPOLLET)) {
activity.erase(cur);
} else {
key._file->epoll_add({ this, key });
}
if (!active) {
continue;
}
if (r_e.events & EPOLLONESHOT) {
// since we dropped the lock, the key may not be there anymore
auto i = map.find(key);
if (i != map.end()) {
i->second.events = 0;
key._file->epoll_del({ this, key });
}
}
trace_epoll_ready(key._fd, key._file, active);
events[nr].data = r_e.data;
events[nr].events = active;
++nr;
}
}
return nr;
Expand All @@ -233,13 +243,13 @@ class epoll_file final : public special_file {
// events on _activity_ring only wake up one waiter, so wake up all the rest now.
// we need to do this even if no events were received, since if we exit, then
// _activity_ring_owner will remain unset.
_waiters.wake_all(f_lock);
_waiters.wake_all(_activity_lock);
}
void wake(epoll_key key) {
WITH_LOCK(f_lock) {
WITH_LOCK(_activity_lock) {
auto ins = _activity.insert(key);
if (ins.second) {
_waiters.wake_all(f_lock);
_waiters.wake_all(_activity_lock);
}
}
}
Expand All @@ -249,18 +259,6 @@ class epoll_file final : public special_file {
}
_activity_ring_owner.wake();
}
private:
void remove_me(epoll_key key) {
auto fp = key._file;
WITH_LOCK(fp->f_lock) {
epoll_ptr ptr{this, key};
auto i = boost::range::find(*fp->f_epolls, ptr);
// may race with a concurrent remove_me(), since we're not holding f_lock:
if (i != fp->f_epolls->end()) {
fp->f_epolls->erase(i);
}
}
}
};

int epoll_create(int size)
Expand Down
23 changes: 23 additions & 0 deletions fs/vfs/kern_descrip.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <osv/debug.h>
#include <osv/mutex.h>
#include <osv/rcu.hh>
#include <boost/range/algorithm/find.hpp>

#include <bsd/sys/sys/queue.h>

Expand Down Expand Up @@ -228,6 +229,28 @@ void file::stop_polls()
}
}

void file::epoll_add(epoll_ptr ep)
{
WITH_LOCK(f_lock) {
if (!f_epolls) {
f_epolls.reset(new std::vector<epoll_ptr>);
}
if (boost::range::find(*f_epolls, ep) == f_epolls->end()) {
f_epolls->push_back(ep);
}
}
}

void file::epoll_del(epoll_ptr ep)
{
WITH_LOCK(f_lock) {
assert(f_epolls);
auto i = boost::range::find(*f_epolls, ep);
assert(i != f_epolls->end());
f_epolls->erase(i);
}
}

dentry* file_dentry(file* fp)
{
return fp->f_dentry.get();
Expand Down
4 changes: 2 additions & 2 deletions include/osv/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ struct file {
virtual int stat(struct stat* buf) = 0;
virtual int close() = 0;
virtual int chmod(mode_t mode) = 0;
virtual void epoll_add() {}
virtual void epoll_del() {}
virtual void epoll_add(epoll_ptr ep);
virtual void epoll_del(epoll_ptr ep);
virtual void poll_install(pollreq& pr) {}
virtual void poll_uninstall(pollreq& pr) {}
virtual std::unique_ptr<mmu::file_vma> mmap(addr_range range, unsigned flags, unsigned perm, off_t offset) {
Expand Down
4 changes: 2 additions & 2 deletions include/osv/socket.hh
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public:
virtual int stat(struct stat* buf) override;
virtual int close() override;
virtual int chmod(mode_t mode) override;
virtual void epoll_add() override;
virtual void epoll_del() override;
virtual void epoll_add(epoll_ptr ep) override;
virtual void epoll_del(epoll_ptr ep) override;
virtual void poll_install(pollreq& pr) override;
virtual void poll_uninstall(pollreq& pr) override;
int bsd_ioctl(u_long cmd, void* data);
Expand Down

0 comments on commit 612f4eb

Please sign in to comment.