Subject: [PATCH 2/2] Add/fix condvar implementations.
[Thread Prev] | [Thread Next]
- Subject: Subject: [PATCH 2/2] Add/fix condvar implementations.
- From: iriri <iri@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx>
- Reply-to: myrddin-dev@xxxxxxxxxxxxxx
- Date: Mon, 23 Jul 2018 23:20:20 -0700
- To: "myrddin-dev" <myrddin-dev@xxxxxxxxxxxxxx>
--- lib/thread/bld.sub | 22 ++++---- lib/thread/condvar+freebsd.myr | 33 +++++------- lib/thread/condvar+linux.myr | 18 +++---- lib/thread/condvar+openbsd:6.2.myr | 56 +++++++++++++++++++ lib/thread/condvar+osx.myr | 53 ++++++++++++++++++ lib/thread/condvar.myr | 87 ++++++++++++++++++++++++++++++ lib/thread/mutex+futex.myr | 20 ++++--- lib/thread/test/condvar.myr | 46 ++++++---------- 8 files changed, 263 insertions(+), 72 deletions(-) create mode 100644 lib/thread/condvar+openbsd:6.2.myr create mode 100644 lib/thread/condvar+osx.myr create mode 100644 lib/thread/condvar.myr diff --git a/lib/thread/bld.sub b/lib/thread/bld.sub index a0c1a745..ed2ea2dc 100644 --- a/lib/thread/bld.sub +++ b/lib/thread/bld.sub @@ -1,25 +1,28 @@ lib thread = common.myr hookstd.myr # install thread hooks + + # generic fallbacks + condvar.myr + mutex.myr + ncpu.myr + sem.myr + waitgrp.myr + + # futex-based impls mutex+futex.myr sem+futex.myr waitgrp+futex.myr - mutex.myr # fallback, for unimplemented platforms - sem.myr # fallback, for unimplemented platforms - waitgrp.myr # fallback, for unimplemented platforms - - #generic fallbacks - ncpu.myr # linux impl of basic thread primitives - #condvar+linux.myr + condvar+linux.myr exit+linux-x64.s futex+linux.myr ncpu+linux.myr spawn+linux.myr # freebsd impl of thread primitives - #condvar+freebsd.myr + condvar+freebsd.myr exit+freebsd-x64.s futex+freebsd.myr ncpu+freebsd.myr @@ -33,7 +36,7 @@ lib thread = #exit+netbsd-x64.s # osx impl of thread primitives - #condvar+osx.myr + condvar+osx.myr futex+osx.myr spawn+osx.myr start+osx-x64.s @@ -47,6 +50,7 @@ lib thread = spawn+plan9.myr # openbsd impl of thread primitives + condvar+openbsd:6.2.myr exit+openbsd-x64.s futex+openbsd:6.2.myr ncpu+openbsd.myr diff --git a/lib/thread/condvar+freebsd.myr b/lib/thread/condvar+freebsd.myr index 65267a4a..002757ae 100644 --- a/lib/thread/condvar+freebsd.myr +++ b/lib/thread/condvar+freebsd.myr @@ -1,14 +1,14 @@ use std -use sys use "atomic" use "common" use "mutex" +use "futex" pkg thread = type cond = struct _mtx : mutex# - _seq : uint32 + _seq : ftxtag ;; const mkcond : (mtx : mutex# -> cond) @@ -27,33 +27,28 @@ const condwait = {cond mtx = cond._mtx seq = cond._seq - mtxunlock(mtx) - sys.umtx_op((&cond._seq : void#), \ - sys.Umtxwaituintpriv, \ - (seq : uint64), \ - Zptr, Zptr) /* - We need to atomically set the mutex to contended. This allows us to - pass responsibility for waking up the potential other waiters on to the - unlocker of the mutex. + FIXME?: `ftxwait` can be interrupted but `condwait` should always be + done in a loop anyway. */ - while xchg(&mtx._state, Contended) != Unlocked - sys.umtx_op((&mtx._state : void#), \ - sys.Umtxwaituintpriv, \ - (Contended : uint64), \ - Zptr, Zptr) - ;; + ftxwait(&cond._seq, seq, Zptr) + + mtxlock(mtx) } const condsignal = {cond : cond# xadd(&cond._seq, 1) - sys.umtx_op((&cond._seq : void#), sys.Umtxwakepriv, 1, Zptr, Zptr) + ftxwake(&cond._seq) } +/* +`umtx_op` fully supports implementing condvars efficiently but also requires +condvars to be implemented in a specific way. For now we'll just invite the +thundering herd. +*/ const condbroadcast = {cond : cond# xadd(&cond._seq, 1) - sys.umtx_op((&cond._seq : void#), sys.Umtxwakepriv, 0x7ffffff, Zptr, Zptr) + ftxwakeall(&cond._seq) } - diff --git a/lib/thread/condvar+linux.myr b/lib/thread/condvar+linux.myr index cd4ab8c8..e1a9e100 100644 --- a/lib/thread/condvar+linux.myr +++ b/lib/thread/condvar+linux.myr @@ -27,8 +27,12 @@ const condwait = {cond mtx = cond._mtx seq = cond._seq - mtxunlock(mtx) + + /* + FIXME?: `futex` can be interrupted but `condwait` should always be done + in a loop anyway. + */ sys.futex(&cond._seq, sys.Futexwait | sys.Futexpriv, seq, Zptr, Zptr, 0) /* @@ -36,10 +40,7 @@ const condwait = {cond pass responsibility for waking up the potential other waiters on to the unlocker of the mutex. */ - while xchg(&mtx._state, Contended) != Unlocked - sys.futex(&mtx._state, sys.Futexwait | sys.Futexpriv, \ - Contended, Zptr, Zptr, 0) - ;; + mtxcontended(mtx) } const condsignal = {cond : cond# @@ -54,8 +55,7 @@ const condbroadcast = {cond : cond# used for the number of threads to move, and is not ignored when requeueing */ - sys.futex(&cond._seq, sys.Futexcmprequeue | sys.Futexpriv, \ - 1, (0x7fffffff : sys.timespec#), \ - &cond._mtx._state, cond._seq) + sys.futex(&cond._seq, sys.Futexrequeue | sys.Futexpriv, + 1, (0x7fffffff : sys.timespec#), + (&cond._mtx._state : int32#), 0) } - diff --git a/lib/thread/condvar+openbsd:6.2.myr b/lib/thread/condvar+openbsd:6.2.myr new file mode 100644 index 00000000..c72d0ee2 --- /dev/null +++ b/lib/thread/condvar+openbsd:6.2.myr @@ -0,0 +1,56 @@ +use std +use sys + +use "atomic" +use "common" +use "mutex" + +pkg thread = + type cond = struct + _mtx : mutex# + _seq : uint32 + ;; + + const mkcond : (mtx : mutex# -> cond) + const condwait : (cond : cond# -> void) + const condsignal : (cond : cond# -> void) + const condbroadcast : (cond : cond# -> void) +;; + +const mkcond = {mtx + -> [._mtx = mtx, ._seq = 0] +} + +const condwait = {cond + var seq + var mtx + + mtx = cond._mtx + seq = cond._seq + mtxunlock(mtx) + + /* + FIXME?: `futex` can be interrupted but `condwait` should always be done + in a loop anyway. + */ + sys.futex(&cond._seq, sys.Futexwait, seq, Zptr, Zptr) + + /* + We need to atomically set the mutex to contended. This allows us to + pass responsibility for waking up the potential other waiters on to the + unlocker of the mutex. + */ + mtxcontended(mtx) +} + +const condsignal = {cond : cond# + xadd(&cond._seq, 1) + sys.futex(&cond._seq, sys.Futexwake, 1, Zptr, Zptr) +} + +const condbroadcast = {cond : cond# + xadd(&cond._seq, 1) + sys.futex(&cond._seq, sys.Futexrequeue, 1, + (0x7fffffff : sys.timespec#), + (&cond._mtx._state : uint32#)) +} diff --git a/lib/thread/condvar+osx.myr b/lib/thread/condvar+osx.myr new file mode 100644 index 00000000..d74c321f --- /dev/null +++ b/lib/thread/condvar+osx.myr @@ -0,0 +1,53 @@ +use std + +use "atomic" +use "common" +use "mutex" +use "futex" + +pkg thread = + type cond = struct + _mtx : mutex# + _seq : ftxtag + ;; + + const mkcond : (mtx : mutex# -> cond) + const condwait : (cond : cond# -> void) + const condsignal : (cond : cond# -> void) + const condbroadcast : (cond : cond# -> void) +;; + +const mkcond = {mtx + -> [._mtx = mtx, ._seq = 0] +} + +const condwait = {cond + var seq + var mtx + + mtx = cond._mtx + seq = cond._seq + mtxunlock(mtx) + + /* + FIXME?: `ftxwait` can be interrupted but `condwait` should always be + done in a loop anyway. + */ + ftxwait(&cond._seq, seq, Zptr) + + mtxlock(mtx) +} + +const condsignal = {cond : cond# + xadd(&cond._seq, 1) + ftxwake(&cond._seq) +} + +/* +Yes, this invites the thundering herd but that's what OS X gets for not having +a requeue operation. +*/ +const condbroadcast = {cond : cond# + xadd(&cond._seq, 1) + ftxwakeall(&cond._seq) +} diff --git a/lib/thread/condvar.myr b/lib/thread/condvar.myr new file mode 100644 index 00000000..9649b920 --- /dev/null +++ b/lib/thread/condvar.myr @@ -0,0 +1,87 @@ +use "atomic" +use "common" +use "mutex" +use "sem" + +pkg thread = + type cond = struct + _mtx : mutex# + _waitq : condwaiter# + _lock : mutex + ;; + + const mkcond : (mtx : mutex# -> cond) + const condwait : (cond : cond# -> void) + const condsignal : (cond : cond# -> void) + const condbroadcast : (cond : cond# -> void) +;; + +/* +The waitqueue is a doubly-linked list because we'll need to remove waiters from +anywhere in the list when we add timeout support. + +`cond._waitq.prev` is the tail of the queue. +*/ +type condwaiter = struct + next : condwaiter# + prev : condwaiter# + sem : sem +;; + +const mkcond = {mtx + -> [._mtx = mtx, ._lock = mkmtx()] +} + +const condwait = {cond + var mtx = cond._mtx + var lock = &cond._lock + var waiter = [.sem = mksem(0)] + + mtxlock(lock) + match cond._waitq + | Zptr: + waiter.prev = &waiter + cond._waitq = &waiter + | q: + waiter.prev = q.prev + waiter.prev.next = &waiter + q.prev = &waiter + ;; + + mtxunlock(lock) + mtxunlock(mtx) + semwait(&waiter.sem) + + mtxlock(mtx) +} + +const condsignal = {cond + var lock = &cond._lock + + mtxlock(lock) + var head = cond._waitq + if head != Zptr + if head.next != Zptr + head.next.prev = head.prev + ;; + cond._waitq = head.next + sempost(&head.sem) + ;; + mtxunlock(lock) +} + +/* +Yes, this invites the thundering herd but that's what you get for not +supporting futexes at all. +*/ +const condbroadcast = {cond + var lock = &cond._lock + var head = Zptr + + mtxlock(lock) + while (head = cond._waitq) != Zptr + cond._waitq = head.next + sempost(&head.sem) + ;; + mtxunlock(lock) +} diff --git a/lib/thread/mutex+futex.myr b/lib/thread/mutex+futex.myr index 5878755a..c8d40c61 100644 --- a/lib/thread/mutex+futex.myr +++ b/lib/thread/mutex+futex.myr @@ -12,11 +12,13 @@ pkg thread = const mtxtrylock : (mtx : mutex# -> bool) const mtxunlock : (mtx : mutex# -> void) - pkglocal const Unlocked = 0 - pkglocal const Locked = 1 - pkglocal const Contended = 2 + pkglocal const mtxcontended : (mtx : mutex# -> void) ;; +const Unlocked = 0 +const Locked = 1 +const Contended = 2 + var nspin = 10 /* FIXME: pick a sane number, based on CPU count */ const mkmtx = { @@ -38,9 +40,9 @@ const mtxlock = {mtx ;; /* - Contended case: we set the lock state to Contended. This indicates that there - the lock is locked, and we potentially have threads waiting on it, which means - that we will need to wake them up. + Contended case: we set the lock state to Contended. This indicates that + the lock is locked, and we potentially have threads waiting on it, + which means that we will need to wake them up. */ if c == Locked c = xchg(&mtx._state, Contended) @@ -71,3 +73,9 @@ const mtxunlock = {mtx /* wake one thread */ ftxwake(&mtx._state) } + +const mtxcontended = {mtx + while xchg(&mtx._state, Contended) != Unlocked + ftxwait(&mtx._state, Contended, Zptr) + ;; +} diff --git a/lib/thread/test/condvar.myr b/lib/thread/test/condvar.myr index 0f1de58e..92380dc5 100644 --- a/lib/thread/test/condvar.myr +++ b/lib/thread/test/condvar.myr @@ -1,55 +1,44 @@ use std use thread -use "util" +use thrtestutil const Nwakes = 1000 var cv +var cv1 var mtx var val -var done : int32 +var ready var nwoken : int32 -var nready : int32 -var locked : int32 const main = { - done = 0 + ready = thread.mkwg(2) val = 123 mtx = thread.mkmtx() cv = thread.mkcond(&mtx) + cv1 = thread.mkcond(&mtx) thread.spawn(cvwait) thread.spawn(cvwake) - while done == 0 - /* nothing */ - ;; + thread.wgwait(&ready) std.assert(nwoken == Nwakes, "wrong number of wakes") - std.assert(val == 123, "wrong val after all are done") + ready = thread.mkwg(100) nwoken = 0 - nready = 0 - mkherd(100, cvwaitonce) + thrtestutil.mkherd(100, cvwaitonce) - /* wait until the herd is ready */ - while nready != 100 /* 0 to 99 */ - /* nothing */ - ;; - while locked == 0 - /* nothing */ - ;; - thread.condbroadcast(&cv) + thread.wgwait(&ready) while nwoken != 100 - /* nothing */ + thread.condbroadcast(&cv) ;; - std.assert(nwoken == 100, "wrong thread count woken") - } const cvwait = { for var i = 0; i < Nwakes; i++ thread.mtxlock(&mtx) + thread.condsignal(&cv1) thread.condwait(&cv) std.assert(val == 456, "wrong val after signal\n") val = 123 @@ -57,29 +46,28 @@ const cvwait = { thread.xadd(&nwoken, 1) ;; - val = 123 - thread.xadd(&done, 1) - + thread.condsignal(&cv1) + thread.wgpost(&ready) } const cvwake = { while true thread.mtxlock(&mtx) val = 456 + thread.condsignal(&cv) + thread.condwait(&cv1) thread.mtxunlock(&mtx) - thread.condsignal(&cv) if nwoken >= Nwakes break ;; ;; + thread.wgpost(&ready) } const cvwaitonce = { - thread.xadd(&nready, 1) - thread.mtxlock(&mtx) - thread.xadd(&locked, 1) + thread.wgpost(&ready) thread.condwait(&cv) thread.mtxunlock(&mtx) -- 2.18.0
Re: Subject: [PATCH 2/2] Add/fix condvar implementations. | Ori Bernstein <ori@xxxxxxxxxxxxxx> |
- Prev by Date: [PATCH 1/2] Add ftxwakeall and waitgrps to libthread.
- Next by Date: Re: [PATCH 1/2] Add ftxwakeall and waitgrps to libthread.
- Previous by thread: Re: [PATCH 1/2] Add ftxwakeall and waitgrps to libthread.
- Next by thread: Re: Subject: [PATCH 2/2] Add/fix condvar implementations.
- Index(es):