[PATCH 2/2] Fix futex timeouts, handle futex error codes, and add mtxtimedlock, semtimedwait, and condtimedwait.
[Thread Prev] | [Thread Next]
- Subject: [PATCH 2/2] Fix futex timeouts, handle futex error codes, and add mtxtimedlock, semtimedwait, and condtimedwait.
- From: iriri <iri@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx>
- Reply-to: myrddin-dev@xxxxxxxxxxxxxx
- Date: Sat, 28 Jul 2018 17:57:43 -0700
- To: "myrddin-dev" <myrddin-dev@xxxxxxxxxxxxxx>
Having `ftxwait` return `sys.errno` rather than a union type feels somewhat questionable. The fallbacks, as always, are also somewhat questionable. Handling `sys.Eintr` within `ftxwait` seems better than having to handle it everywhere `ftxwait` gets used. --- lib/sys/sys+linux-x64.myr | 6 ++- lib/thread/condvar+freebsd.myr | 24 +++++------ lib/thread/condvar+linux.myr | 28 +++++++------ lib/thread/condvar+openbsd:6.2.myr | 28 +++++++------ lib/thread/condvar+osx.myr | 24 +++++------ lib/thread/condvar.myr | 47 ++++++++++++++++++++-- lib/thread/futex+freebsd.myr | 46 ++++++++++++++------- lib/thread/futex+linux.myr | 53 ++++++++++++++++++++----- lib/thread/futex+openbsd:6.2.myr | 56 ++++++++++++++++++++++++-- lib/thread/futex+osx.myr | 64 ++++++++++++++++++++---------- lib/thread/mutex+futex.myr | 30 +++++++++----- lib/thread/mutex+plan9.myr | 13 ++++++ lib/thread/mutex.myr | 17 ++++++++ lib/thread/sem+futex.myr | 29 +++++++++----- lib/thread/sem+plan9.myr | 13 ++++++ lib/thread/sem.myr | 17 ++++++++ lib/thread/test/condvar.myr | 11 +++++ lib/thread/test/mutex.myr | 11 +++++ lib/thread/waitgrp+futex.myr | 2 +- 19 files changed, 393 insertions(+), 126 deletions(-) diff --git a/lib/sys/sys+linux-x64.myr b/lib/sys/sys+linux-x64.myr index 1cebb123..83fdea53 100644 --- a/lib/sys/sys+linux-x64.myr +++ b/lib/sys/sys+linux-x64.myr @@ -562,6 +562,8 @@ pkg sys = const Futexpriv : futexop = 128 const Futexclockrt : futexop = 256 + + const Futexbitsetmatchany : int32 = -1 /* poll events : posix */ const Pollin : pollevt = 0x001 /* There is data to read. */ @@ -1273,7 +1275,7 @@ pkg sys = const dup : (fd : fd -> fd) const dup2 : (src : fd, dst : fd -> fd) const futex : (uaddr : int32#, op : futexop, val : int32, \ - timeout : timespec#, uaddr2 : int32#, val3 : int32 -> int64) + timeout : timespec#, uaddr2 : int32#, val3 : int32 -> int) const semctl : (semid : int, semnum : int, cmd : int, arg : void# -> int) const epollcreate : (flg : epollflags -> fd) /* actually epoll_create1 */ const epollctl : (epfd : fd, op : int, fd : fd, evt : epollevt# -> int) @@ -1692,7 +1694,7 @@ pkg sys = /* threading */ const futex = {uaddr, op, val, timeout, uaddr2, val3 - -> syscall(Sysfutex, a(uaddr), a(op), a(val), a(timeout), a(uaddr2), a(val3)) + -> (syscall(Sysfutex, a(uaddr), a(op), a(val), a(timeout), a(uaddr2), a(val3)) : int) } const semctl = {semid, semnum, cmd, arg -> (syscall(Syssemctl, a(semnum), a(cmd), a(arg)) : int) diff --git a/lib/thread/condvar+freebsd.myr b/lib/thread/condvar+freebsd.myr index 002757ae..e27712f9 100644 --- a/lib/thread/condvar+freebsd.myr +++ b/lib/thread/condvar+freebsd.myr @@ -1,7 +1,7 @@ use std +use sys use "atomic" -use "common" use "mutex" use "futex" @@ -13,6 +13,7 @@ pkg thread = const mkcond : (mtx : mutex# -> cond) const condwait : (cond : cond# -> void) + const condtimedwait : (cond : cond#, tmout : uint32 -> bool) const condsignal : (cond : cond# -> void) const condbroadcast : (cond : cond# -> void) ;; @@ -22,20 +23,19 @@ const mkcond = {mtx } const condwait = {cond - var seq - var mtx - - mtx = cond._mtx - seq = cond._seq - mtxunlock(mtx) + condtimedwait(cond, 0) +} - /* - FIXME?: `ftxwait` can be interrupted but `condwait` should always be - done in a loop anyway. - */ - ftxwait(&cond._seq, seq, Zptr) +const condtimedwait = {cond, tmout + var seq = cond._seq + var mtx = cond._mtx + mtxunlock(mtx) + if ftxwait(&cond._seq, seq, tmout) == sys.Etimedout + -> false + ;; mtxlock(mtx) + -> true } const condsignal = {cond : cond# diff --git a/lib/thread/condvar+linux.myr b/lib/thread/condvar+linux.myr index e1a9e100..b3a29c7c 100644 --- a/lib/thread/condvar+linux.myr +++ b/lib/thread/condvar+linux.myr @@ -3,16 +3,18 @@ use sys use "atomic" use "common" +use "futex" use "mutex" pkg thread = type cond = struct _mtx : mutex# - _seq : int32 + _seq : ftxtag ;; const mkcond : (mtx : mutex# -> cond) const condwait : (cond : cond# -> void) + const condtimedwait : (cond : cond#, tmout : uint32 -> bool) const condsignal : (cond : cond# -> void) const condbroadcast : (cond : cond# -> void) ;; @@ -22,18 +24,17 @@ const mkcond = {mtx } const condwait = {cond - var seq - var mtx + condtimedwait(cond, 0) +} - mtx = cond._mtx - seq = cond._seq - mtxunlock(mtx) +const condtimedwait = {cond, tmout + var seq = cond._seq + var mtx = cond._mtx - /* - FIXME?: `futex` can be interrupted but `condwait` should always be done - in a loop anyway. - */ - sys.futex(&cond._seq, sys.Futexwait | sys.Futexpriv, seq, Zptr, Zptr, 0) + mtxunlock(mtx) + if ftxwait(&cond._seq, seq, tmout) == sys.Etimedout + -> false + ;; /* We need to atomically set the mutex to contended. This allows us to @@ -41,11 +42,12 @@ const condwait = {cond unlocker of the mutex. */ mtxcontended(mtx) + -> true } const condsignal = {cond : cond# xadd(&cond._seq, 1) - sys.futex(&cond._seq, sys.Futexwake | sys.Futexpriv, 1, Zptr, Zptr, 0) + ftxwake(&cond._seq) } const condbroadcast = {cond : cond# @@ -55,7 +57,7 @@ const condbroadcast = {cond : cond# used for the number of threads to move, and is not ignored when requeueing */ - sys.futex(&cond._seq, sys.Futexrequeue | sys.Futexpriv, + sys.futex((&cond._seq : int32#), sys.Futexrequeue | sys.Futexpriv, 1, (0x7fffffff : sys.timespec#), (&cond._mtx._state : int32#), 0) } diff --git a/lib/thread/condvar+openbsd:6.2.myr b/lib/thread/condvar+openbsd:6.2.myr index c72d0ee2..d92aab36 100644 --- a/lib/thread/condvar+openbsd:6.2.myr +++ b/lib/thread/condvar+openbsd:6.2.myr @@ -3,16 +3,18 @@ use sys use "atomic" use "common" +use "futex" use "mutex" pkg thread = type cond = struct _mtx : mutex# - _seq : uint32 + _seq : ftxtag ;; const mkcond : (mtx : mutex# -> cond) const condwait : (cond : cond# -> void) + const condtimedwait : (cond : cond#, tmout : uint32 -> bool) const condsignal : (cond : cond# -> void) const condbroadcast : (cond : cond# -> void) ;; @@ -22,18 +24,17 @@ const mkcond = {mtx } const condwait = {cond - var seq - var mtx + condtimedwait(cond, 0) +} - mtx = cond._mtx - seq = cond._seq - mtxunlock(mtx) +const condtimedwait = {cond, tmout + var seq = cond._seq + var mtx = cond._mtx - /* - FIXME?: `futex` can be interrupted but `condwait` should always be done - in a loop anyway. - */ - sys.futex(&cond._seq, sys.Futexwait, seq, Zptr, Zptr) + mtxunlock(mtx) + if ftxwait(&cond._seq, seq, tmout) == sys.Etimedout + -> false + ;; /* We need to atomically set the mutex to contended. This allows us to @@ -41,16 +42,17 @@ const condwait = {cond unlocker of the mutex. */ mtxcontended(mtx) + -> true } const condsignal = {cond : cond# xadd(&cond._seq, 1) - sys.futex(&cond._seq, sys.Futexwake, 1, Zptr, Zptr) + ftxwake(&cond._seq) } const condbroadcast = {cond : cond# xadd(&cond._seq, 1) - sys.futex(&cond._seq, sys.Futexrequeue, 1, + sys.futex((&cond._seq : uint32#), sys.Futexrequeue, 1, (0x7fffffff : sys.timespec#), (&cond._mtx._state : uint32#)) } diff --git a/lib/thread/condvar+osx.myr b/lib/thread/condvar+osx.myr index d74c321f..cfab7654 100644 --- a/lib/thread/condvar+osx.myr +++ b/lib/thread/condvar+osx.myr @@ -1,7 +1,7 @@ use std +use sys use "atomic" -use "common" use "mutex" use "futex" @@ -13,6 +13,7 @@ pkg thread = const mkcond : (mtx : mutex# -> cond) const condwait : (cond : cond# -> void) + const condtimedwait : (cond : cond#, tmout : uint32 -> bool) const condsignal : (cond : cond# -> void) const condbroadcast : (cond : cond# -> void) ;; @@ -22,20 +23,19 @@ const mkcond = {mtx } const condwait = {cond - var seq - var mtx - - mtx = cond._mtx - seq = cond._seq - mtxunlock(mtx) + condtimedwait(cond, 0) +} - /* - FIXME?: `ftxwait` can be interrupted but `condwait` should always be - done in a loop anyway. - */ - ftxwait(&cond._seq, seq, Zptr) +const condtimedwait = {cond, tmout + var seq = cond._seq + var mtx = cond._mtx + mtxunlock(mtx) + if ftxwait(&cond._seq, seq, tmout) == sys.Etimedout + -> false + ;; mtxlock(mtx) + -> true } const condsignal = {cond : cond# diff --git a/lib/thread/condvar.myr b/lib/thread/condvar.myr index 4fde7073..8971d029 100644 --- a/lib/thread/condvar.myr +++ b/lib/thread/condvar.myr @@ -14,13 +14,14 @@ pkg thread = const mkcond : (mtx : mutex# -> cond) const condwait : (cond : cond# -> void) + const condtimedwait : (cond : cond#, tmout : uint32 -> bool) const condsignal : (cond : cond# -> void) const condbroadcast : (cond : cond# -> void) ;; /* -The waitqueue is a doubly-linked list because we'll need to remove waiters from -anywhere in the list when we add timeout support. +The waitqueue is a doubly-linked list because we need to remove waiters from +anywhere in the list when we timeout. `cond._waitq.prev` is the tail of the queue. */ @@ -35,6 +36,10 @@ const mkcond = {mtx } const condwait = {cond + condtimedwait(cond, 0) +} + +const condtimedwait = {cond, tmout var mtx = cond._mtx var lock = &cond._lock var waiter = std.mk([.sem = mksem(0)]) @@ -49,12 +54,44 @@ const condwait = {cond waiter.prev.next = waiter q.prev = waiter ;; - mtxunlock(lock) mtxunlock(mtx) - semwait(&waiter.sem) + + /* + If we time out, attempt to remove ourselves from the waitqueue. If we + get signalled before we can do this, consider ourselves successfully + woken regardless of how much time has elapsed. + */ + if !semtimedwait(&waiter.sem, tmout) + var rc = false + + mtxlock(lock) + if waiter.prev == Zptr /* Already shifted off the front */ + rc = true + goto ret + ;; + if waiter.prev == waiter /* We are the only waiter in the queue */ + cond._waitq = Zptr + goto ret + ;; + + if waiter.prev.next == waiter + waiter.prev.next = waiter.next + ;; + if waiter.next != Zptr + waiter.next.prev = waiter.prev + else + cond._waitq.prev = waiter.prev + ;; +:ret + mtxunlock(lock) + std.free(waiter) + -> rc + ;; + std.free(waiter) mtxlock(mtx) + -> true } const condsignal = {cond @@ -66,6 +103,7 @@ const condsignal = {cond if head.next != Zptr head.next.prev = head.prev ;; + head.prev = Zptr cond._waitq = head.next sempost(&head.sem) ;; @@ -82,6 +120,7 @@ const condbroadcast = {cond mtxlock(lock) while (head = cond._waitq) != Zptr + head.prev = Zptr cond._waitq = head.next sempost(&head.sem) ;; diff --git a/lib/thread/futex+freebsd.myr b/lib/thread/futex+freebsd.myr index ea5f28d8..b291240f 100644 --- a/lib/thread/futex+freebsd.myr +++ b/lib/thread/futex+freebsd.myr @@ -1,3 +1,4 @@ +use std use sys use "atomic" @@ -7,26 +8,43 @@ pkg thread = type ftxtag = uint32 impl atomic ftxtag - const ftxwait : (uaddr : ftxtag#, val : ftxtag, timeout : sys.timespec# -> int) + const ftxwait : (uaddr : ftxtag#, val : ftxtag, tmout : uint32 -> sys.errno) const ftxwake : (uaddr : ftxtag# -> int) const ftxwakeall : (uaddr : ftxtag# -> int) ;; -const ftxwait = {uaddr, val, timeout - if timeout == Zptr - -> sys.umtx_op((uaddr : void#), sys.Umtxwaituintpriv, (val : uint64), Zptr, Zptr) +const ftxwait = {uaddr, val, tmout + var ut : sys._umtx_time, utp, utsize, rc + + if tmout == Zptr + utp = Zptr + utsize = Zptr + else + var t = (tmout : int64) + std.assert(sys.clock_gettime(`sys.Clockmonotonic, &ut._tmout) == 0, + "error: clock_gettime returned -1\n") + ut._tmout.nsec += (t % 1_000_000) * 1000 + ut._tmout.sec += (ut._tmout.nsec / 1_000_000_000) + (t / 1_000_000) + ut._tmout.nsec %= 1_000_000_000 + ut._flags = (sys.Umtxabstime : uint32), + ut._clockid = 1, /* CLOCK_MONOTONIC. Not exported from sys. */ + utp = &ut + utsize = (sizeof(sys._umtx_time) : void#) + ;; + + while ((rc = sys.umtx_op((uaddr : void#), + sys.Umtxwaituintpriv, + (val : uint64), + utsize, + utp) : sys.errno)) == sys.Eintr ;; - var ut : sys._umtx_time = [ - ._timeout = timeout#, - ._flags = (sys.Umtxabstime : uint32), - ._clockid = 1, /* CLOCK_MONOTONIC. Not exported from sys. */ - ] - -> sys.umtx_op((uaddr : void#), - sys.Umtxwaituintpriv, - (val : uint64), - (sizeof(sys._umtx_time) : void#), - (&ut : void#)) + match rc + | 0: -> 0 + | sys.Eagain: -> sys.Eagain + | sys.Etimedout: -> sys.Etimedout + | err: std.fatal("error: futex returned {}\n", err) + ;; } const ftxwake = {uaddr diff --git a/lib/thread/futex+linux.myr b/lib/thread/futex+linux.myr index c5dbf062..c5c3d56a 100644 --- a/lib/thread/futex+linux.myr +++ b/lib/thread/futex+linux.myr @@ -1,3 +1,4 @@ +use std use sys use "atomic" @@ -7,26 +8,58 @@ pkg thread = type ftxtag = uint32 impl atomic ftxtag - const ftxwait : (uaddr : ftxtag#, val : ftxtag, timeout : sys.timespec# -> int) + const ftxwait : (uaddr : ftxtag#, val : ftxtag, tmout : uint32 -> sys.errno) const ftxwake : (uaddr : ftxtag# -> int) const ftxwakeall : (uaddr : ftxtag# -> int) ;; -const ftxwait = {uaddr, val, timeout - -> (sys.futex((uaddr : int32#), - sys.Futexwait | sys.Futexpriv, - (val : int32), - timeout, - Zptr, - 0) : int) +const ftxwait = {uaddr, val, tmout + var rc + + if tmout == 0 + while (rc = (sys.futex((uaddr : int32#), + sys.Futexwait | sys.Futexpriv, + (val : int32), + Zptr, + Zptr, + 0) : sys.errno)) == sys.Eintr + ;; + else + var t = (tmout : int64) + var ts + std.assert(sys.clock_gettime(`sys.Clockmonotonic, &ts) == 0, + "error: clock_gettime returned -1\n") + ts.nsec += (t % 1_000_000) * 1000 + ts.sec += (ts.nsec / 1_000_000_000) + (t / 1_000_000) + ts.nsec %= 1_000_000_000 + + /* + Futexwaitbitset + Futexbitsetmatchany causes the timeout to be + treated as absolute rather than relative. + */ + while (rc = (sys.futex((uaddr : int32#), + sys.Futexwaitbitset | sys.Futexpriv, + (val : int32), + &ts, + Zptr, + sys.Futexbitsetmatchany) : sys.errno)) == sys.Eintr + ;; + ;; + + match rc + | 0: -> 0 + | sys.Eagain: -> sys.Eagain + | sys.Etimedout: -> sys.Etimedout + | err: std.fatal("error: futex returned {}\n", err) + ;; } const ftxwake = {uaddr - -> (sys.futex((uaddr : int32#), sys.Futexwake | sys.Futexpriv, 1, Zptr, Zptr, 0) : int) + -> sys.futex((uaddr : int32#), sys.Futexwake | sys.Futexpriv, 1, Zptr, Zptr, 0) } const ftxwakeall = {uaddr - -> (sys.futex((uaddr : int32#), sys.Futexwake | sys.Futexpriv, 0x7fffffff, Zptr, Zptr, 0) : int) + -> sys.futex((uaddr : int32#), sys.Futexwake | sys.Futexpriv, 0x7fffffff, Zptr, Zptr, 0) } impl atomic ftxtag = diff --git a/lib/thread/futex+openbsd:6.2.myr b/lib/thread/futex+openbsd:6.2.myr index e3c9c413..87d0a049 100644 --- a/lib/thread/futex+openbsd:6.2.myr +++ b/lib/thread/futex+openbsd:6.2.myr @@ -1,3 +1,4 @@ +use std use sys use "atomic" @@ -7,13 +8,62 @@ pkg thread = type ftxtag = uint32 impl atomic ftxtag - const ftxwait : (uaddr : ftxtag#, val : ftxtag, timeout : sys.timespec# -> int) + const ftxwait : (uaddr : ftxtag#, val : ftxtag, tmout : uint32 -> sys.errno) const ftxwake : (uaddr : ftxtag# -> int) const ftxwakeall : (uaddr : ftxtag# -> int) ;; -const ftxwait = {uaddr, val, timeout - -> sys.futex((uaddr : uint32#), sys.Futexwait, (val : int), timeout, Zptr) +const ftxwait = {uaddr, val, tmout + var rc + + if tmout == Zptr + while (rc = (sys.futex((uaddr : uint32#), + sys.Futexwait, + (val : int), + Zptr, + Zptr) : sys.errno)) == sys.Eintr + ;; + else + var t = (tmout : uint64) + var start + std.assert(sys.clock_gettime(`sys.Clockmonotonic, &start) == 0, + "error: clock_gettime returned -1\n") + var ts = [ + .sec = t / 1_000_000 + .nsec = (t % 1_000_000) * 1000 + ] + + while (rc = (sys.futex((uaddr : uint32#), + sys.Futexwait, + (val : int), + &ts, + Zptr) : sys.errno)) == sys.Eintr + var now + std.assert(sys.clock_gettime(`sys.Clockmonotonic, &now) == 0, + "error: clock_gettime returned -1\n") + var t1 = t - (((now.sec - start.sec) * 1_000_000) : uint32) + var nsec = now.nsec - start.nsec + if nsec >= 0 + t1 -= (nsec / 1000 : uint32) + else + t1 -= ((1_000_000_000 + nsec) / 1000 : uint32) + ;; + + if t1 > t + -> sys.Etimedout + ;; + ts.sec = t1 / 1_000_000 + ts.nsec = (t1 % 1_000_000) * 1000 + t = t1 + ;; + ;; + + match rc + | 0: -> 0 + | sys.Eagain: -> sys.Eagain + | sys.Etimedout: -> sys.Etimedout + | err: std.fatal("error: futex returned {}\n", err) + ;; } const ftxwake = {uaddr diff --git a/lib/thread/futex+osx.myr b/lib/thread/futex+osx.myr index 17478703..167434f1 100644 --- a/lib/thread/futex+osx.myr +++ b/lib/thread/futex+osx.myr @@ -8,7 +8,7 @@ pkg thread = type ftxtag = uint64 impl atomic ftxtag - const ftxwait : (uaddr : ftxtag#, val : ftxtag, timeout : sys.timespec# -> int) + const ftxwait : (uaddr : ftxtag#, val : ftxtag, tmout : uint32 -> sys.errno) const ftxwake : (uaddr : ftxtag# -> int) const ftxwakeall : (uaddr : ftxtag# -> int) ;; @@ -17,32 +17,52 @@ pkg thread = * The ulock_ functions are undocumented but the relevant source can be found at * https://github.com/apple/darwin-xnu/blob/0a798f6738bc1db01281fc08ae024145e84df927/bsd/kern/sys_ulock.c */ -const ftxwait = {uaddr, val, timeout - if timeout == Zptr - -> sys.ulock_wait(sys.Ulockcompareandwait, (uaddr : uint64#), (val : uint64), 0) - ;; +const ftxwait = {uaddr, val, tmout + var rc - var ts - var err = sys.clock_gettime(`sys.Clockmonotonic, &ts) - std.assert(err == 0, "error: clock_gettime returned {}\n", err) - if timeout.sec < ts.sec - -> (std.Etimedout : int) - ;; + if tmout == 0 + while (rc = (sys.ulock_wait(sys.Ulockcompareandwait, + (uaddr : uint64#), + (val : uint64), + 0) : sys.errno)) == sys.Eintr + ;; + else + var start + std.assert(sys.clock_gettime(`sys.Clockmonotonic, &start) == 0, + "error: clock_gettime returned -1\n") + + while (rc = (sys.ulock_wait(sys.Ulockcompareandwait, + (uaddr : uint64#), + (val : uint64), + tmout) : sys.errno)) == sys.Eintr + var now + std.assert(sys.clock_gettime(`sys.Clockmonotonic, &now) == 0, + "error: clock_gettime returned -1\n") + var t = tmout - (((now.sec - start.sec) * 1_000_000) : uint32) + var nsec = now.nsec - start.nsec + if nsec >= 0 + t -= (nsec / 1000 : uint32) + else + t -= ((1_000_000_000 + nsec) / 1000 : uint32) + ;; - var usec = 0 - var sec = (timeout.sec - ts.sec) * 1000 - std.assert(sec <= 0xffffffff, "error: maximum futex timeout exceeded\n") - usec = (sec : uint32) - if timeout.nsec > ts.nsec - var nsec = (timeout.nsec - ts.nsec) / 1000 - std.assert(usec + nsec > usec, "error: maximum futex timeout exceeded\n") - usec += nsec + if t > tmout + -> sys.Etimedout + ;; + tmout = t + ;; ;; - if usec == 0 - -> (std.Etimedout : int) + match rc + | 0: -> 0 + | sys.Eagain: -> sys.Eagain + | sys.Etimedout: -> sys.Etimedout + | err: + if err > 0 + -> 0 + ;; + std.fatal("error: ulock_wait returned {}\n", err) ;; - -> sys.ulock_wait(sys.Ulockcompareandwait, (uaddr : uint64#), (val : uint64), usec) } const ftxwake = {uaddr diff --git a/lib/thread/mutex+futex.myr b/lib/thread/mutex+futex.myr index c8d40c61..72028f82 100644 --- a/lib/thread/mutex+futex.myr +++ b/lib/thread/mutex+futex.myr @@ -1,3 +1,5 @@ +use sys + use "atomic" use "common" use "futex" @@ -10,6 +12,7 @@ pkg thread = const mkmtx : (-> mutex) const mtxlock : (mtx : mutex# -> void) const mtxtrylock : (mtx : mutex# -> bool) + const mtxtimedlock : (mtx : mutex# , tmout : uint32 -> bool) const mtxunlock : (mtx : mutex# -> void) pkglocal const mtxcontended : (mtx : mutex# -> void) @@ -26,16 +29,24 @@ const mkmtx = { } const mtxlock = {mtx + mtxtimedlock(mtx, 0) +} + +const mtxtrylock = {mtx + -> xcas(&mtx._state, Unlocked, Locked) == Unlocked +} + +const mtxtimedlock = {mtx, tmout var c - /* + /* Uncontended case: we get an unlocked mutex, and we lock it. */ - c = Locked + c = Locked for var i = 0; i < nspin; i++ - c = xcas(&mtx._state, Unlocked, Locked) + c = xcas(&mtx._state, Unlocked, Locked) if c == Unlocked - -> void + -> true ;; ;; @@ -49,13 +60,12 @@ const mtxlock = {mtx ;; while c != Unlocked - ftxwait(&mtx._state, Contended, Zptr) + if ftxwait(&mtx._state, Contended, tmout) == sys.Etimedout + -> false + ;; c = xchg(&mtx._state, Contended) ;; -} - -const mtxtrylock = {mtx - -> xcas(&mtx._state, Unlocked, Locked) == Unlocked + -> true } const mtxunlock = {mtx @@ -76,6 +86,6 @@ const mtxunlock = {mtx const mtxcontended = {mtx while xchg(&mtx._state, Contended) != Unlocked - ftxwait(&mtx._state, Contended, Zptr) + ftxwait(&mtx._state, Contended, 0) ;; } diff --git a/lib/thread/mutex+plan9.myr b/lib/thread/mutex+plan9.myr index e2e1207f..c4fa5482 100644 --- a/lib/thread/mutex+plan9.myr +++ b/lib/thread/mutex+plan9.myr @@ -14,6 +14,7 @@ pkg thread = const mkmtx : (-> mutex) const mtxlock : (mtx : mutex# -> void) const mtxtrylock : (mtx : mutex# -> bool) + const mtxtimedlock : (mtx : mutex# , tmout : uint32 -> bool) const mtxunlock : (mtx : mutex# -> void) ;; @@ -36,6 +37,18 @@ const mtxtrylock = {mtx -> xcas(&mtx._state, 0, 1) == 0 } +const mtxtimedlock = {mtx, tmout + var iters = tmout / 100 + + for var i = 0; i < iters; i++ + if xcas(&mtx._state, 0, 1) == 0 + -> true + ;; + std.nanosleep(90_000) + ;; + -> false +} + const mtxunlock = {mtx /* if we were the only thread waiting on the lock, there was no contention */ diff --git a/lib/thread/mutex.myr b/lib/thread/mutex.myr index 5b905749..df1fdb88 100644 --- a/lib/thread/mutex.myr +++ b/lib/thread/mutex.myr @@ -13,6 +13,7 @@ pkg thread = const mkmtx : (-> mutex) const mtxlock : (mtx : mutex# -> void) const mtxtrylock : (mtx : mutex# -> bool) + const mtxtimedlock : (mtx : mutex# , tmout : uint32 -> bool) const mtxunlock : (mtx : mutex# -> void) ;; @@ -59,6 +60,22 @@ const mtxtrylock = {mtx -> xcas(&mtx._state, 0, 1) == 0 } +const mtxtimedlock = {mtx, tmout + if tmout == 0 + mtxlock(mtx) + -> true + ;; + + var iters = tmout / 100 + for var i = 0; i < iters; i++ + if xcas(&mtx._state, 0, 1) == 0 + -> true + ;; + std.nanosleep(90_000) + ;; + -> false +} + const mtxunlock = {mtx xset(&mtx._state, 0) diff --git a/lib/thread/sem+futex.myr b/lib/thread/sem+futex.myr index d79bd41b..6e91339b 100644 --- a/lib/thread/sem+futex.myr +++ b/lib/thread/sem+futex.myr @@ -1,4 +1,5 @@ use std +use sys use "atomic" use "common" @@ -12,6 +13,7 @@ pkg thread = const mksem : (v : uint32 -> sem) const semwait : (s : sem# -> void) const semtrywait : (s : sem# -> bool) + const semtimedwait : (sem : sem#, tmout : uint32 -> bool) const sempost : (s : sem# -> void) ;; @@ -20,16 +22,7 @@ const mksem = {v } const semwait = {s - var v = 0 - - for ; ; - while (v = s._val) > 0 - if xcas(&s._val, v, v - 1) == v - -> void - ;; - ;; - ftxwait(&s._val, v, Zptr) - ;; + semtimedwait(s, 0) } const semtrywait = {s @@ -45,6 +38,22 @@ const semtrywait = {s -> false /* Unreachable */ } +const semtimedwait = {s, tmout + var v = 0 + + for ; ; + while (v = s._val) > 0 + if xcas(&s._val, v, v - 1) == v + -> true + ;; + ;; + if ftxwait(&s._val, v, tmout) == sys.Etimedout + -> false + ;; + ;; + -> false /* Unreachable */ +} + const sempost = {s std.assert((xadd(&s._val, 1) : uint32) != ~0x0, "error: semaphore overflowed\n") diff --git a/lib/thread/sem+plan9.myr b/lib/thread/sem+plan9.myr index 221a8056..17b8d0c0 100644 --- a/lib/thread/sem+plan9.myr +++ b/lib/thread/sem+plan9.myr @@ -13,6 +13,7 @@ pkg thread = const mksem : (v : uint32 -> sem) const semwait : (s : sem# -> void) const semtrywait : (s : sem# -> bool) + const semtimedwait : (sem : sem#, tmout : uint32 -> bool) const sempost : (s : sem# -> void) ;; @@ -46,6 +47,18 @@ const semtrywait = {s -> false /* Unreachable */ } +const semtimedwait = {mtx, tmout + var iters = tmout / 100 + + for var i = 0; i < iters; i++ + if semtrywait(s) + -> true + ;; + std.nanosleep(90_000) + ;; + -> false +} + const sempost = {s var u = xadd(&s._user, 1) std.assert(u != 0x7fffffff, "error: semaphore overflowed\n") diff --git a/lib/thread/sem.myr b/lib/thread/sem.myr index 7aea35f3..9d83a7ee 100644 --- a/lib/thread/sem.myr +++ b/lib/thread/sem.myr @@ -10,6 +10,7 @@ pkg thread = const mksem : (v : uint32 -> sem) const semwait : (s : sem# -> void) const semtrywait : (s : sem# -> bool) + const semtimedwait : (sem : sem#, tmout : uint32 -> bool) const sempost : (s : sem# -> void) ;; @@ -61,6 +62,22 @@ const semtrywait = {s -> false /* Unreachable */ } +const semtimedwait = {s, tmout + if tmout == 0 + semwait(s) + -> true + ;; + + var iters = tmout / 100 + for var i = 0; i < iters; i++ + if semtrywait(s) + -> true + ;; + std.nanosleep(90_000) + ;; + -> false +} + const sempost = {s std.assert(xadd(&s._val, 1) != ~0x0, "error: semaphore overflowed\n") } diff --git a/lib/thread/test/condvar.myr b/lib/thread/test/condvar.myr index 92380dc5..8a8e5039 100644 --- a/lib/thread/test/condvar.myr +++ b/lib/thread/test/condvar.myr @@ -33,6 +33,17 @@ const main = { while nwoken != 100 thread.condbroadcast(&cv) ;; + + thread.mtxlock(&mtx) + var start = std.now() + std.assert(!thread.condtimedwait(&cv, 1_000_000), + "condtimedwait is broken: expected sys.Etimedout\n") + var elapsed = std.now() - start + if elapsed < 900_000 + std.fatal("condtimedwait returned early after {} microseconds\n", elapsed) + elif elapsed > 1_500_000 + std.fatal("condtimedwait returned late after {} microseconds\n", elapsed) + ;; } const cvwait = { diff --git a/lib/thread/test/mutex.myr b/lib/thread/test/mutex.myr index fd58df13..a1041356 100644 --- a/lib/thread/test/mutex.myr +++ b/lib/thread/test/mutex.myr @@ -21,6 +21,17 @@ const main = { if val != 1000 * 20 std.fatal("mutexes are broken, got {}\n", val) ;; + + thread.mtxlock(&mtx) + var start = std.now() + std.assert(!thread.mtxtimedlock(&mtx, 1_000_000), + "mtxtimedlock is broken: expected sys.Etimedout\n") + var elapsed = std.now() - start + if elapsed < 900_000 + std.fatal("mtxtimedlock returned early after {} microseconds\n", elapsed) + elif elapsed > 1_500_000 + std.fatal("mtxtimedlock returned late after {} microseconds\n", elapsed) + ;; } const incvar = { diff --git a/lib/thread/waitgrp+futex.myr b/lib/thread/waitgrp+futex.myr index 7fbd5eb6..7db7dce2 100644 --- a/lib/thread/waitgrp+futex.myr +++ b/lib/thread/waitgrp+futex.myr @@ -22,7 +22,7 @@ const wgwait = {w var v = 0 while (v = xget(&w._val)) != 0 - ftxwait(&w._val, v, Zptr) + ftxwait(&w._val, v, 0) ;; } -- 2.18.0
- Prev by Date: [PATCH 1/2] Make timespec/timeval struct members signed to simplify arithmatic.
- Next by Date: Re: union member already defined?
- Previous by thread: [PATCH 1/2] Make timespec/timeval struct members signed to simplify arithmatic.
- Index(es):