[PATCH 2/2] Fix futex timeouts, handle futex error codes, and add mtxtimedlock, semtimedwait, and condtimedwait.
[Thread Prev] | [Thread Next]
- Subject: [PATCH 2/2] Fix futex timeouts, handle futex error codes, and add mtxtimedlock, semtimedwait, and condtimedwait.
- From: iriri <iri@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx>
- Reply-to: myrddin-dev@xxxxxxxxxxxxxx
- Date: Sat, 28 Jul 2018 17:57:43 -0700
- To: "myrddin-dev" <myrddin-dev@xxxxxxxxxxxxxx>
Having `ftxwait` return `sys.errno` rather than a union type feels somewhat questionable. The fallbacks, as always, are also somewhat questionable.
Handling `sys.Eintr` within `ftxwait` seems better than having to handle it everywhere `ftxwait` gets used.
---
lib/sys/sys+linux-x64.myr | 6 ++-
lib/thread/condvar+freebsd.myr | 24 +++++------
lib/thread/condvar+linux.myr | 28 +++++++------
lib/thread/condvar+openbsd:6.2.myr | 28 +++++++------
lib/thread/condvar+osx.myr | 24 +++++------
lib/thread/condvar.myr | 47 ++++++++++++++++++++--
lib/thread/futex+freebsd.myr | 46 ++++++++++++++-------
lib/thread/futex+linux.myr | 53 ++++++++++++++++++++-----
lib/thread/futex+openbsd:6.2.myr | 56 ++++++++++++++++++++++++--
lib/thread/futex+osx.myr | 64 ++++++++++++++++++++----------
lib/thread/mutex+futex.myr | 30 +++++++++-----
lib/thread/mutex+plan9.myr | 13 ++++++
lib/thread/mutex.myr | 17 ++++++++
lib/thread/sem+futex.myr | 29 +++++++++-----
lib/thread/sem+plan9.myr | 13 ++++++
lib/thread/sem.myr | 17 ++++++++
lib/thread/test/condvar.myr | 11 +++++
lib/thread/test/mutex.myr | 11 +++++
lib/thread/waitgrp+futex.myr | 2 +-
19 files changed, 393 insertions(+), 126 deletions(-)
diff --git a/lib/sys/sys+linux-x64.myr b/lib/sys/sys+linux-x64.myr
index 1cebb123..83fdea53 100644
--- a/lib/sys/sys+linux-x64.myr
+++ b/lib/sys/sys+linux-x64.myr
@@ -562,6 +562,8 @@ pkg sys =
const Futexpriv : futexop = 128
const Futexclockrt : futexop = 256
+
+ const Futexbitsetmatchany : int32 = -1
/* poll events : posix */
const Pollin : pollevt = 0x001 /* There is data to read. */
@@ -1273,7 +1275,7 @@ pkg sys =
const dup : (fd : fd -> fd)
const dup2 : (src : fd, dst : fd -> fd)
const futex : (uaddr : int32#, op : futexop, val : int32, \
- timeout : timespec#, uaddr2 : int32#, val3 : int32 -> int64)
+ timeout : timespec#, uaddr2 : int32#, val3 : int32 -> int)
const semctl : (semid : int, semnum : int, cmd : int, arg : void# -> int)
const epollcreate : (flg : epollflags -> fd) /* actually epoll_create1 */
const epollctl : (epfd : fd, op : int, fd : fd, evt : epollevt# -> int)
@@ -1692,7 +1694,7 @@ pkg sys =
/* threading */
const futex = {uaddr, op, val, timeout, uaddr2, val3
- -> syscall(Sysfutex, a(uaddr), a(op), a(val), a(timeout), a(uaddr2), a(val3))
+ -> (syscall(Sysfutex, a(uaddr), a(op), a(val), a(timeout), a(uaddr2), a(val3)) : int)
}
const semctl = {semid, semnum, cmd, arg
-> (syscall(Syssemctl, a(semnum), a(cmd), a(arg)) : int)
diff --git a/lib/thread/condvar+freebsd.myr b/lib/thread/condvar+freebsd.myr
index 002757ae..e27712f9 100644
--- a/lib/thread/condvar+freebsd.myr
+++ b/lib/thread/condvar+freebsd.myr
@@ -1,7 +1,7 @@
use std
+use sys
use "atomic"
-use "common"
use "mutex"
use "futex"
@@ -13,6 +13,7 @@ pkg thread =
const mkcond : (mtx : mutex# -> cond)
const condwait : (cond : cond# -> void)
+ const condtimedwait : (cond : cond#, tmout : uint32 -> bool)
const condsignal : (cond : cond# -> void)
const condbroadcast : (cond : cond# -> void)
;;
@@ -22,20 +23,19 @@ const mkcond = {mtx
}
const condwait = {cond
- var seq
- var mtx
-
- mtx = cond._mtx
- seq = cond._seq
- mtxunlock(mtx)
+ condtimedwait(cond, 0)
+}
- /*
- FIXME?: `ftxwait` can be interrupted but `condwait` should always be
- done in a loop anyway.
- */
- ftxwait(&cond._seq, seq, Zptr)
+const condtimedwait = {cond, tmout
+ var seq = cond._seq
+ var mtx = cond._mtx
+ mtxunlock(mtx)
+ if ftxwait(&cond._seq, seq, tmout) == sys.Etimedout
+ -> false
+ ;;
mtxlock(mtx)
+ -> true
}
const condsignal = {cond : cond#
diff --git a/lib/thread/condvar+linux.myr b/lib/thread/condvar+linux.myr
index e1a9e100..b3a29c7c 100644
--- a/lib/thread/condvar+linux.myr
+++ b/lib/thread/condvar+linux.myr
@@ -3,16 +3,18 @@ use sys
use "atomic"
use "common"
+use "futex"
use "mutex"
pkg thread =
type cond = struct
_mtx : mutex#
- _seq : int32
+ _seq : ftxtag
;;
const mkcond : (mtx : mutex# -> cond)
const condwait : (cond : cond# -> void)
+ const condtimedwait : (cond : cond#, tmout : uint32 -> bool)
const condsignal : (cond : cond# -> void)
const condbroadcast : (cond : cond# -> void)
;;
@@ -22,18 +24,17 @@ const mkcond = {mtx
}
const condwait = {cond
- var seq
- var mtx
+ condtimedwait(cond, 0)
+}
- mtx = cond._mtx
- seq = cond._seq
- mtxunlock(mtx)
+const condtimedwait = {cond, tmout
+ var seq = cond._seq
+ var mtx = cond._mtx
- /*
- FIXME?: `futex` can be interrupted but `condwait` should always be done
- in a loop anyway.
- */
- sys.futex(&cond._seq, sys.Futexwait | sys.Futexpriv, seq, Zptr, Zptr, 0)
+ mtxunlock(mtx)
+ if ftxwait(&cond._seq, seq, tmout) == sys.Etimedout
+ -> false
+ ;;
/*
We need to atomically set the mutex to contended. This allows us to
@@ -41,11 +42,12 @@ const condwait = {cond
unlocker of the mutex.
*/
mtxcontended(mtx)
+ -> true
}
const condsignal = {cond : cond#
xadd(&cond._seq, 1)
- sys.futex(&cond._seq, sys.Futexwake | sys.Futexpriv, 1, Zptr, Zptr, 0)
+ ftxwake(&cond._seq)
}
const condbroadcast = {cond : cond#
@@ -55,7 +57,7 @@ const condbroadcast = {cond : cond#
used for the number of threads to move, and is not ignored when
requeueing
*/
- sys.futex(&cond._seq, sys.Futexrequeue | sys.Futexpriv,
+ sys.futex((&cond._seq : int32#), sys.Futexrequeue | sys.Futexpriv,
1, (0x7fffffff : sys.timespec#),
(&cond._mtx._state : int32#), 0)
}
diff --git a/lib/thread/condvar+openbsd:6.2.myr b/lib/thread/condvar+openbsd:6.2.myr
index c72d0ee2..d92aab36 100644
--- a/lib/thread/condvar+openbsd:6.2.myr
+++ b/lib/thread/condvar+openbsd:6.2.myr
@@ -3,16 +3,18 @@ use sys
use "atomic"
use "common"
+use "futex"
use "mutex"
pkg thread =
type cond = struct
_mtx : mutex#
- _seq : uint32
+ _seq : ftxtag
;;
const mkcond : (mtx : mutex# -> cond)
const condwait : (cond : cond# -> void)
+ const condtimedwait : (cond : cond#, tmout : uint32 -> bool)
const condsignal : (cond : cond# -> void)
const condbroadcast : (cond : cond# -> void)
;;
@@ -22,18 +24,17 @@ const mkcond = {mtx
}
const condwait = {cond
- var seq
- var mtx
+ condtimedwait(cond, 0)
+}
- mtx = cond._mtx
- seq = cond._seq
- mtxunlock(mtx)
+const condtimedwait = {cond, tmout
+ var seq = cond._seq
+ var mtx = cond._mtx
- /*
- FIXME?: `futex` can be interrupted but `condwait` should always be done
- in a loop anyway.
- */
- sys.futex(&cond._seq, sys.Futexwait, seq, Zptr, Zptr)
+ mtxunlock(mtx)
+ if ftxwait(&cond._seq, seq, tmout) == sys.Etimedout
+ -> false
+ ;;
/*
We need to atomically set the mutex to contended. This allows us to
@@ -41,16 +42,17 @@ const condwait = {cond
unlocker of the mutex.
*/
mtxcontended(mtx)
+ -> true
}
const condsignal = {cond : cond#
xadd(&cond._seq, 1)
- sys.futex(&cond._seq, sys.Futexwake, 1, Zptr, Zptr)
+ ftxwake(&cond._seq)
}
const condbroadcast = {cond : cond#
xadd(&cond._seq, 1)
- sys.futex(&cond._seq, sys.Futexrequeue, 1,
+ sys.futex((&cond._seq : uint32#), sys.Futexrequeue, 1,
(0x7fffffff : sys.timespec#),
(&cond._mtx._state : uint32#))
}
diff --git a/lib/thread/condvar+osx.myr b/lib/thread/condvar+osx.myr
index d74c321f..cfab7654 100644
--- a/lib/thread/condvar+osx.myr
+++ b/lib/thread/condvar+osx.myr
@@ -1,7 +1,7 @@
use std
+use sys
use "atomic"
-use "common"
use "mutex"
use "futex"
@@ -13,6 +13,7 @@ pkg thread =
const mkcond : (mtx : mutex# -> cond)
const condwait : (cond : cond# -> void)
+ const condtimedwait : (cond : cond#, tmout : uint32 -> bool)
const condsignal : (cond : cond# -> void)
const condbroadcast : (cond : cond# -> void)
;;
@@ -22,20 +23,19 @@ const mkcond = {mtx
}
const condwait = {cond
- var seq
- var mtx
-
- mtx = cond._mtx
- seq = cond._seq
- mtxunlock(mtx)
+ condtimedwait(cond, 0)
+}
- /*
- FIXME?: `ftxwait` can be interrupted but `condwait` should always be
- done in a loop anyway.
- */
- ftxwait(&cond._seq, seq, Zptr)
+const condtimedwait = {cond, tmout
+ var seq = cond._seq
+ var mtx = cond._mtx
+ mtxunlock(mtx)
+ if ftxwait(&cond._seq, seq, tmout) == sys.Etimedout
+ -> false
+ ;;
mtxlock(mtx)
+ -> true
}
const condsignal = {cond : cond#
diff --git a/lib/thread/condvar.myr b/lib/thread/condvar.myr
index 4fde7073..8971d029 100644
--- a/lib/thread/condvar.myr
+++ b/lib/thread/condvar.myr
@@ -14,13 +14,14 @@ pkg thread =
const mkcond : (mtx : mutex# -> cond)
const condwait : (cond : cond# -> void)
+ const condtimedwait : (cond : cond#, tmout : uint32 -> bool)
const condsignal : (cond : cond# -> void)
const condbroadcast : (cond : cond# -> void)
;;
/*
-The waitqueue is a doubly-linked list because we'll need to remove waiters from
-anywhere in the list when we add timeout support.
+The waitqueue is a doubly-linked list because we need to remove waiters from
+anywhere in the list when we timeout.
`cond._waitq.prev` is the tail of the queue.
*/
@@ -35,6 +36,10 @@ const mkcond = {mtx
}
const condwait = {cond
+ condtimedwait(cond, 0)
+}
+
+const condtimedwait = {cond, tmout
var mtx = cond._mtx
var lock = &cond._lock
var waiter = std.mk([.sem = mksem(0)])
@@ -49,12 +54,44 @@ const condwait = {cond
waiter.prev.next = waiter
q.prev = waiter
;;
-
mtxunlock(lock)
mtxunlock(mtx)
- semwait(&waiter.sem)
+
+ /*
+ If we time out, attempt to remove ourselves from the waitqueue. If we
+ get signalled before we can do this, consider ourselves successfully
+ woken regardless of how much time has elapsed.
+ */
+ if !semtimedwait(&waiter.sem, tmout)
+ var rc = false
+
+ mtxlock(lock)
+ if waiter.prev == Zptr /* Already shifted off the front */
+ rc = true
+ goto ret
+ ;;
+ if waiter.prev == waiter /* We are the only waiter in the queue */
+ cond._waitq = Zptr
+ goto ret
+ ;;
+
+ if waiter.prev.next == waiter
+ waiter.prev.next = waiter.next
+ ;;
+ if waiter.next != Zptr
+ waiter.next.prev = waiter.prev
+ else
+ cond._waitq.prev = waiter.prev
+ ;;
+:ret
+ mtxunlock(lock)
+ std.free(waiter)
+ -> rc
+ ;;
+
std.free(waiter)
mtxlock(mtx)
+ -> true
}
const condsignal = {cond
@@ -66,6 +103,7 @@ const condsignal = {cond
if head.next != Zptr
head.next.prev = head.prev
;;
+ head.prev = Zptr
cond._waitq = head.next
sempost(&head.sem)
;;
@@ -82,6 +120,7 @@ const condbroadcast = {cond
mtxlock(lock)
while (head = cond._waitq) != Zptr
+ head.prev = Zptr
cond._waitq = head.next
sempost(&head.sem)
;;
diff --git a/lib/thread/futex+freebsd.myr b/lib/thread/futex+freebsd.myr
index ea5f28d8..b291240f 100644
--- a/lib/thread/futex+freebsd.myr
+++ b/lib/thread/futex+freebsd.myr
@@ -1,3 +1,4 @@
+use std
use sys
use "atomic"
@@ -7,26 +8,43 @@ pkg thread =
type ftxtag = uint32
impl atomic ftxtag
- const ftxwait : (uaddr : ftxtag#, val : ftxtag, timeout : sys.timespec# -> int)
+ const ftxwait : (uaddr : ftxtag#, val : ftxtag, tmout : uint32 -> sys.errno)
const ftxwake : (uaddr : ftxtag# -> int)
const ftxwakeall : (uaddr : ftxtag# -> int)
;;
-const ftxwait = {uaddr, val, timeout
- if timeout == Zptr
- -> sys.umtx_op((uaddr : void#), sys.Umtxwaituintpriv, (val : uint64), Zptr, Zptr)
+const ftxwait = {uaddr, val, tmout
+ var ut : sys._umtx_time, utp, utsize, rc
+
+ if tmout == Zptr
+ utp = Zptr
+ utsize = Zptr
+ else
+ var t = (tmout : int64)
+ std.assert(sys.clock_gettime(`sys.Clockmonotonic, &ut._tmout) == 0,
+ "error: clock_gettime returned -1\n")
+ ut._tmout.nsec += (t % 1_000_000) * 1000
+ ut._tmout.sec += (ut._tmout.nsec / 1_000_000_000) + (t / 1_000_000)
+ ut._tmout.nsec %= 1_000_000_000
+ ut._flags = (sys.Umtxabstime : uint32),
+ ut._clockid = 1, /* CLOCK_MONOTONIC. Not exported from sys. */
+ utp = &ut
+ utsize = (sizeof(sys._umtx_time) : void#)
+ ;;
+
+ while ((rc = sys.umtx_op((uaddr : void#),
+ sys.Umtxwaituintpriv,
+ (val : uint64),
+ utsize,
+ utp) : sys.errno)) == sys.Eintr
;;
- var ut : sys._umtx_time = [
- ._timeout = timeout#,
- ._flags = (sys.Umtxabstime : uint32),
- ._clockid = 1, /* CLOCK_MONOTONIC. Not exported from sys. */
- ]
- -> sys.umtx_op((uaddr : void#),
- sys.Umtxwaituintpriv,
- (val : uint64),
- (sizeof(sys._umtx_time) : void#),
- (&ut : void#))
+ match rc
+ | 0: -> 0
+ | sys.Eagain: -> sys.Eagain
+ | sys.Etimedout: -> sys.Etimedout
+ | err: std.fatal("error: futex returned {}\n", err)
+ ;;
}
const ftxwake = {uaddr
diff --git a/lib/thread/futex+linux.myr b/lib/thread/futex+linux.myr
index c5dbf062..c5c3d56a 100644
--- a/lib/thread/futex+linux.myr
+++ b/lib/thread/futex+linux.myr
@@ -1,3 +1,4 @@
+use std
use sys
use "atomic"
@@ -7,26 +8,58 @@ pkg thread =
type ftxtag = uint32
impl atomic ftxtag
- const ftxwait : (uaddr : ftxtag#, val : ftxtag, timeout : sys.timespec# -> int)
+ const ftxwait : (uaddr : ftxtag#, val : ftxtag, tmout : uint32 -> sys.errno)
const ftxwake : (uaddr : ftxtag# -> int)
const ftxwakeall : (uaddr : ftxtag# -> int)
;;
-const ftxwait = {uaddr, val, timeout
- -> (sys.futex((uaddr : int32#),
- sys.Futexwait | sys.Futexpriv,
- (val : int32),
- timeout,
- Zptr,
- 0) : int)
+const ftxwait = {uaddr, val, tmout
+ var rc
+
+ if tmout == 0
+ while (rc = (sys.futex((uaddr : int32#),
+ sys.Futexwait | sys.Futexpriv,
+ (val : int32),
+ Zptr,
+ Zptr,
+ 0) : sys.errno)) == sys.Eintr
+ ;;
+ else
+ var t = (tmout : int64)
+ var ts
+ std.assert(sys.clock_gettime(`sys.Clockmonotonic, &ts) == 0,
+ "error: clock_gettime returned -1\n")
+ ts.nsec += (t % 1_000_000) * 1000
+ ts.sec += (ts.nsec / 1_000_000_000) + (t / 1_000_000)
+ ts.nsec %= 1_000_000_000
+
+ /*
+ Futexwaitbitset + Futexbitsetmatchany causes the timeout to be
+ treated as absolute rather than relative.
+ */
+ while (rc = (sys.futex((uaddr : int32#),
+ sys.Futexwaitbitset | sys.Futexpriv,
+ (val : int32),
+ &ts,
+ Zptr,
+ sys.Futexbitsetmatchany) : sys.errno)) == sys.Eintr
+ ;;
+ ;;
+
+ match rc
+ | 0: -> 0
+ | sys.Eagain: -> sys.Eagain
+ | sys.Etimedout: -> sys.Etimedout
+ | err: std.fatal("error: futex returned {}\n", err)
+ ;;
}
const ftxwake = {uaddr
- -> (sys.futex((uaddr : int32#), sys.Futexwake | sys.Futexpriv, 1, Zptr, Zptr, 0) : int)
+ -> sys.futex((uaddr : int32#), sys.Futexwake | sys.Futexpriv, 1, Zptr, Zptr, 0)
}
const ftxwakeall = {uaddr
- -> (sys.futex((uaddr : int32#), sys.Futexwake | sys.Futexpriv, 0x7fffffff, Zptr, Zptr, 0) : int)
+ -> sys.futex((uaddr : int32#), sys.Futexwake | sys.Futexpriv, 0x7fffffff, Zptr, Zptr, 0)
}
impl atomic ftxtag =
diff --git a/lib/thread/futex+openbsd:6.2.myr b/lib/thread/futex+openbsd:6.2.myr
index e3c9c413..87d0a049 100644
--- a/lib/thread/futex+openbsd:6.2.myr
+++ b/lib/thread/futex+openbsd:6.2.myr
@@ -1,3 +1,4 @@
+use std
use sys
use "atomic"
@@ -7,13 +8,62 @@ pkg thread =
type ftxtag = uint32
impl atomic ftxtag
- const ftxwait : (uaddr : ftxtag#, val : ftxtag, timeout : sys.timespec# -> int)
+ const ftxwait : (uaddr : ftxtag#, val : ftxtag, tmout : uint32 -> sys.errno)
const ftxwake : (uaddr : ftxtag# -> int)
const ftxwakeall : (uaddr : ftxtag# -> int)
;;
-const ftxwait = {uaddr, val, timeout
- -> sys.futex((uaddr : uint32#), sys.Futexwait, (val : int), timeout, Zptr)
+const ftxwait = {uaddr, val, tmout
+ var rc
+
+ if tmout == Zptr
+ while (rc = (sys.futex((uaddr : uint32#),
+ sys.Futexwait,
+ (val : int),
+ Zptr,
+ Zptr) : sys.errno)) == sys.Eintr
+ ;;
+ else
+ var t = (tmout : uint64)
+ var start
+ std.assert(sys.clock_gettime(`sys.Clockmonotonic, &start) == 0,
+ "error: clock_gettime returned -1\n")
+ var ts = [
+ .sec = t / 1_000_000
+ .nsec = (t % 1_000_000) * 1000
+ ]
+
+ while (rc = (sys.futex((uaddr : uint32#),
+ sys.Futexwait,
+ (val : int),
+ &ts,
+ Zptr) : sys.errno)) == sys.Eintr
+ var now
+ std.assert(sys.clock_gettime(`sys.Clockmonotonic, &now) == 0,
+ "error: clock_gettime returned -1\n")
+ var t1 = t - (((now.sec - start.sec) * 1_000_000) : uint32)
+ var nsec = now.nsec - start.nsec
+ if nsec >= 0
+ t1 -= (nsec / 1000 : uint32)
+ else
+ t1 -= ((1_000_000_000 + nsec) / 1000 : uint32)
+ ;;
+
+ if t1 > t
+ -> sys.Etimedout
+ ;;
+ ts.sec = t1 / 1_000_000
+ ts.nsec = (t1 % 1_000_000) * 1000
+ t = t1
+ ;;
+ ;;
+
+ match rc
+ | 0: -> 0
+ | sys.Eagain: -> sys.Eagain
+ | sys.Etimedout: -> sys.Etimedout
+ | err: std.fatal("error: futex returned {}\n", err)
+ ;;
}
const ftxwake = {uaddr
diff --git a/lib/thread/futex+osx.myr b/lib/thread/futex+osx.myr
index 17478703..167434f1 100644
--- a/lib/thread/futex+osx.myr
+++ b/lib/thread/futex+osx.myr
@@ -8,7 +8,7 @@ pkg thread =
type ftxtag = uint64
impl atomic ftxtag
- const ftxwait : (uaddr : ftxtag#, val : ftxtag, timeout : sys.timespec# -> int)
+ const ftxwait : (uaddr : ftxtag#, val : ftxtag, tmout : uint32 -> sys.errno)
const ftxwake : (uaddr : ftxtag# -> int)
const ftxwakeall : (uaddr : ftxtag# -> int)
;;
@@ -17,32 +17,52 @@ pkg thread =
* The ulock_ functions are undocumented but the relevant source can be found at
* https://github.com/apple/darwin-xnu/blob/0a798f6738bc1db01281fc08ae024145e84df927/bsd/kern/sys_ulock.c
*/
-const ftxwait = {uaddr, val, timeout
- if timeout == Zptr
- -> sys.ulock_wait(sys.Ulockcompareandwait, (uaddr : uint64#), (val : uint64), 0)
- ;;
+const ftxwait = {uaddr, val, tmout
+ var rc
- var ts
- var err = sys.clock_gettime(`sys.Clockmonotonic, &ts)
- std.assert(err == 0, "error: clock_gettime returned {}\n", err)
- if timeout.sec < ts.sec
- -> (std.Etimedout : int)
- ;;
+ if tmout == 0
+ while (rc = (sys.ulock_wait(sys.Ulockcompareandwait,
+ (uaddr : uint64#),
+ (val : uint64),
+ 0) : sys.errno)) == sys.Eintr
+ ;;
+ else
+ var start
+ std.assert(sys.clock_gettime(`sys.Clockmonotonic, &start) == 0,
+ "error: clock_gettime returned -1\n")
+
+ while (rc = (sys.ulock_wait(sys.Ulockcompareandwait,
+ (uaddr : uint64#),
+ (val : uint64),
+ tmout) : sys.errno)) == sys.Eintr
+ var now
+ std.assert(sys.clock_gettime(`sys.Clockmonotonic, &now) == 0,
+ "error: clock_gettime returned -1\n")
+ var t = tmout - (((now.sec - start.sec) * 1_000_000) : uint32)
+ var nsec = now.nsec - start.nsec
+ if nsec >= 0
+ t -= (nsec / 1000 : uint32)
+ else
+ t -= ((1_000_000_000 + nsec) / 1000 : uint32)
+ ;;
- var usec = 0
- var sec = (timeout.sec - ts.sec) * 1000
- std.assert(sec <= 0xffffffff, "error: maximum futex timeout exceeded\n")
- usec = (sec : uint32)
- if timeout.nsec > ts.nsec
- var nsec = (timeout.nsec - ts.nsec) / 1000
- std.assert(usec + nsec > usec, "error: maximum futex timeout exceeded\n")
- usec += nsec
+ if t > tmout
+ -> sys.Etimedout
+ ;;
+ tmout = t
+ ;;
;;
- if usec == 0
- -> (std.Etimedout : int)
+ match rc
+ | 0: -> 0
+ | sys.Eagain: -> sys.Eagain
+ | sys.Etimedout: -> sys.Etimedout
+ | err:
+ if err > 0
+ -> 0
+ ;;
+ std.fatal("error: ulock_wait returned {}\n", err)
;;
- -> sys.ulock_wait(sys.Ulockcompareandwait, (uaddr : uint64#), (val : uint64), usec)
}
const ftxwake = {uaddr
diff --git a/lib/thread/mutex+futex.myr b/lib/thread/mutex+futex.myr
index c8d40c61..72028f82 100644
--- a/lib/thread/mutex+futex.myr
+++ b/lib/thread/mutex+futex.myr
@@ -1,3 +1,5 @@
+use sys
+
use "atomic"
use "common"
use "futex"
@@ -10,6 +12,7 @@ pkg thread =
const mkmtx : (-> mutex)
const mtxlock : (mtx : mutex# -> void)
const mtxtrylock : (mtx : mutex# -> bool)
+ const mtxtimedlock : (mtx : mutex# , tmout : uint32 -> bool)
const mtxunlock : (mtx : mutex# -> void)
pkglocal const mtxcontended : (mtx : mutex# -> void)
@@ -26,16 +29,24 @@ const mkmtx = {
}
const mtxlock = {mtx
+ mtxtimedlock(mtx, 0)
+}
+
+const mtxtrylock = {mtx
+ -> xcas(&mtx._state, Unlocked, Locked) == Unlocked
+}
+
+const mtxtimedlock = {mtx, tmout
var c
- /*
+ /*
Uncontended case: we get an unlocked mutex, and we lock it.
*/
- c = Locked
+ c = Locked
for var i = 0; i < nspin; i++
- c = xcas(&mtx._state, Unlocked, Locked)
+ c = xcas(&mtx._state, Unlocked, Locked)
if c == Unlocked
- -> void
+ -> true
;;
;;
@@ -49,13 +60,12 @@ const mtxlock = {mtx
;;
while c != Unlocked
- ftxwait(&mtx._state, Contended, Zptr)
+ if ftxwait(&mtx._state, Contended, tmout) == sys.Etimedout
+ -> false
+ ;;
c = xchg(&mtx._state, Contended)
;;
-}
-
-const mtxtrylock = {mtx
- -> xcas(&mtx._state, Unlocked, Locked) == Unlocked
+ -> true
}
const mtxunlock = {mtx
@@ -76,6 +86,6 @@ const mtxunlock = {mtx
const mtxcontended = {mtx
while xchg(&mtx._state, Contended) != Unlocked
- ftxwait(&mtx._state, Contended, Zptr)
+ ftxwait(&mtx._state, Contended, 0)
;;
}
diff --git a/lib/thread/mutex+plan9.myr b/lib/thread/mutex+plan9.myr
index e2e1207f..c4fa5482 100644
--- a/lib/thread/mutex+plan9.myr
+++ b/lib/thread/mutex+plan9.myr
@@ -14,6 +14,7 @@ pkg thread =
const mkmtx : (-> mutex)
const mtxlock : (mtx : mutex# -> void)
const mtxtrylock : (mtx : mutex# -> bool)
+ const mtxtimedlock : (mtx : mutex# , tmout : uint32 -> bool)
const mtxunlock : (mtx : mutex# -> void)
;;
@@ -36,6 +37,18 @@ const mtxtrylock = {mtx
-> xcas(&mtx._state, 0, 1) == 0
}
+const mtxtimedlock = {mtx, tmout
+ var iters = tmout / 100
+
+ for var i = 0; i < iters; i++
+ if xcas(&mtx._state, 0, 1) == 0
+ -> true
+ ;;
+ std.nanosleep(90_000)
+ ;;
+ -> false
+}
+
const mtxunlock = {mtx
/* if we were the only thread waiting on the lock, there was no contention */
diff --git a/lib/thread/mutex.myr b/lib/thread/mutex.myr
index 5b905749..df1fdb88 100644
--- a/lib/thread/mutex.myr
+++ b/lib/thread/mutex.myr
@@ -13,6 +13,7 @@ pkg thread =
const mkmtx : (-> mutex)
const mtxlock : (mtx : mutex# -> void)
const mtxtrylock : (mtx : mutex# -> bool)
+ const mtxtimedlock : (mtx : mutex# , tmout : uint32 -> bool)
const mtxunlock : (mtx : mutex# -> void)
;;
@@ -59,6 +60,22 @@ const mtxtrylock = {mtx
-> xcas(&mtx._state, 0, 1) == 0
}
+const mtxtimedlock = {mtx, tmout
+ if tmout == 0
+ mtxlock(mtx)
+ -> true
+ ;;
+
+ var iters = tmout / 100
+ for var i = 0; i < iters; i++
+ if xcas(&mtx._state, 0, 1) == 0
+ -> true
+ ;;
+ std.nanosleep(90_000)
+ ;;
+ -> false
+}
+
const mtxunlock = {mtx
xset(&mtx._state, 0)
diff --git a/lib/thread/sem+futex.myr b/lib/thread/sem+futex.myr
index d79bd41b..6e91339b 100644
--- a/lib/thread/sem+futex.myr
+++ b/lib/thread/sem+futex.myr
@@ -1,4 +1,5 @@
use std
+use sys
use "atomic"
use "common"
@@ -12,6 +13,7 @@ pkg thread =
const mksem : (v : uint32 -> sem)
const semwait : (s : sem# -> void)
const semtrywait : (s : sem# -> bool)
+ const semtimedwait : (sem : sem#, tmout : uint32 -> bool)
const sempost : (s : sem# -> void)
;;
@@ -20,16 +22,7 @@ const mksem = {v
}
const semwait = {s
- var v = 0
-
- for ; ;
- while (v = s._val) > 0
- if xcas(&s._val, v, v - 1) == v
- -> void
- ;;
- ;;
- ftxwait(&s._val, v, Zptr)
- ;;
+ semtimedwait(s, 0)
}
const semtrywait = {s
@@ -45,6 +38,22 @@ const semtrywait = {s
-> false /* Unreachable */
}
+const semtimedwait = {s, tmout
+ var v = 0
+
+ for ; ;
+ while (v = s._val) > 0
+ if xcas(&s._val, v, v - 1) == v
+ -> true
+ ;;
+ ;;
+ if ftxwait(&s._val, v, tmout) == sys.Etimedout
+ -> false
+ ;;
+ ;;
+ -> false /* Unreachable */
+}
+
const sempost = {s
std.assert((xadd(&s._val, 1) : uint32) != ~0x0, "error: semaphore overflowed\n")
diff --git a/lib/thread/sem+plan9.myr b/lib/thread/sem+plan9.myr
index 221a8056..17b8d0c0 100644
--- a/lib/thread/sem+plan9.myr
+++ b/lib/thread/sem+plan9.myr
@@ -13,6 +13,7 @@ pkg thread =
const mksem : (v : uint32 -> sem)
const semwait : (s : sem# -> void)
const semtrywait : (s : sem# -> bool)
+ const semtimedwait : (sem : sem#, tmout : uint32 -> bool)
const sempost : (s : sem# -> void)
;;
@@ -46,6 +47,18 @@ const semtrywait = {s
-> false /* Unreachable */
}
+const semtimedwait = {mtx, tmout
+ var iters = tmout / 100
+
+ for var i = 0; i < iters; i++
+ if semtrywait(s)
+ -> true
+ ;;
+ std.nanosleep(90_000)
+ ;;
+ -> false
+}
+
const sempost = {s
var u = xadd(&s._user, 1)
std.assert(u != 0x7fffffff, "error: semaphore overflowed\n")
diff --git a/lib/thread/sem.myr b/lib/thread/sem.myr
index 7aea35f3..9d83a7ee 100644
--- a/lib/thread/sem.myr
+++ b/lib/thread/sem.myr
@@ -10,6 +10,7 @@ pkg thread =
const mksem : (v : uint32 -> sem)
const semwait : (s : sem# -> void)
const semtrywait : (s : sem# -> bool)
+ const semtimedwait : (sem : sem#, tmout : uint32 -> bool)
const sempost : (s : sem# -> void)
;;
@@ -61,6 +62,22 @@ const semtrywait = {s
-> false /* Unreachable */
}
+const semtimedwait = {s, tmout
+ if tmout == 0
+ semwait(s)
+ -> true
+ ;;
+
+ var iters = tmout / 100
+ for var i = 0; i < iters; i++
+ if semtrywait(s)
+ -> true
+ ;;
+ std.nanosleep(90_000)
+ ;;
+ -> false
+}
+
const sempost = {s
std.assert(xadd(&s._val, 1) != ~0x0, "error: semaphore overflowed\n")
}
diff --git a/lib/thread/test/condvar.myr b/lib/thread/test/condvar.myr
index 92380dc5..8a8e5039 100644
--- a/lib/thread/test/condvar.myr
+++ b/lib/thread/test/condvar.myr
@@ -33,6 +33,17 @@ const main = {
while nwoken != 100
thread.condbroadcast(&cv)
;;
+
+ thread.mtxlock(&mtx)
+ var start = std.now()
+ std.assert(!thread.condtimedwait(&cv, 1_000_000),
+ "condtimedwait is broken: expected sys.Etimedout\n")
+ var elapsed = std.now() - start
+ if elapsed < 900_000
+ std.fatal("condtimedwait returned early after {} microseconds\n", elapsed)
+ elif elapsed > 1_500_000
+ std.fatal("condtimedwait returned late after {} microseconds\n", elapsed)
+ ;;
}
const cvwait = {
diff --git a/lib/thread/test/mutex.myr b/lib/thread/test/mutex.myr
index fd58df13..a1041356 100644
--- a/lib/thread/test/mutex.myr
+++ b/lib/thread/test/mutex.myr
@@ -21,6 +21,17 @@ const main = {
if val != 1000 * 20
std.fatal("mutexes are broken, got {}\n", val)
;;
+
+ thread.mtxlock(&mtx)
+ var start = std.now()
+ std.assert(!thread.mtxtimedlock(&mtx, 1_000_000),
+ "mtxtimedlock is broken: expected sys.Etimedout\n")
+ var elapsed = std.now() - start
+ if elapsed < 900_000
+ std.fatal("mtxtimedlock returned early after {} microseconds\n", elapsed)
+ elif elapsed > 1_500_000
+ std.fatal("mtxtimedlock returned late after {} microseconds\n", elapsed)
+ ;;
}
const incvar = {
diff --git a/lib/thread/waitgrp+futex.myr b/lib/thread/waitgrp+futex.myr
index 7fbd5eb6..7db7dce2 100644
--- a/lib/thread/waitgrp+futex.myr
+++ b/lib/thread/waitgrp+futex.myr
@@ -22,7 +22,7 @@ const wgwait = {w
var v = 0
while (v = xget(&w._val)) != 0
- ftxwait(&w._val, v, Zptr)
+ ftxwait(&w._val, v, 0)
;;
}
--
2.18.0
- Prev by Date: [PATCH 1/2] Make timespec/timeval struct members signed to simplify arithmatic.
- Next by Date: Re: union member already defined?
- Previous by thread: [PATCH 1/2] Make timespec/timeval struct members signed to simplify arithmatic.
- Index(es):