Eigenstate: myrddin-dev mailing list

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Subject: [PATCH 2/2] Add/fix condvar implementations.


---
 lib/thread/bld.sub                 | 22 ++++----
 lib/thread/condvar+freebsd.myr     | 33 +++++-------
 lib/thread/condvar+linux.myr       | 18 +++----
 lib/thread/condvar+openbsd:6.2.myr | 56 +++++++++++++++++++
 lib/thread/condvar+osx.myr         | 53 ++++++++++++++++++
 lib/thread/condvar.myr             | 87 ++++++++++++++++++++++++++++++
 lib/thread/mutex+futex.myr         | 20 ++++---
 lib/thread/test/condvar.myr        | 46 ++++++----------
 8 files changed, 263 insertions(+), 72 deletions(-)
 create mode 100644 lib/thread/condvar+openbsd:6.2.myr
 create mode 100644 lib/thread/condvar+osx.myr
 create mode 100644 lib/thread/condvar.myr

diff --git a/lib/thread/bld.sub b/lib/thread/bld.sub
index a0c1a745..ed2ea2dc 100644
--- a/lib/thread/bld.sub
+++ b/lib/thread/bld.sub
@@ -1,25 +1,28 @@
 lib thread =
 	common.myr
 	hookstd.myr	# install thread hooks
+
+	# generic fallbacks
+	condvar.myr
+	mutex.myr
+	ncpu.myr
+	sem.myr
+	waitgrp.myr
+
+	# futex-based impls
 	mutex+futex.myr
 	sem+futex.myr
 	waitgrp+futex.myr
-	mutex.myr	# fallback, for unimplemented platforms
-	sem.myr		# fallback, for unimplemented platforms
-	waitgrp.myr	# fallback, for unimplemented platforms
-
-	#generic fallbacks
-	ncpu.myr
 
 	# linux impl of basic thread primitives
-	#condvar+linux.myr
+	condvar+linux.myr
 	exit+linux-x64.s
 	futex+linux.myr
 	ncpu+linux.myr
 	spawn+linux.myr
 
 	# freebsd impl of thread primitives
-	#condvar+freebsd.myr
+	condvar+freebsd.myr
 	exit+freebsd-x64.s
 	futex+freebsd.myr
 	ncpu+freebsd.myr
@@ -33,7 +36,7 @@ lib thread =
 	#exit+netbsd-x64.s
 
 	# osx impl of thread primitives
-	#condvar+osx.myr
+	condvar+osx.myr
 	futex+osx.myr
 	spawn+osx.myr
 	start+osx-x64.s
@@ -47,6 +50,7 @@ lib thread =
 	spawn+plan9.myr
 
 	# openbsd impl of thread primitives
+	condvar+openbsd:6.2.myr
 	exit+openbsd-x64.s
 	futex+openbsd:6.2.myr
 	ncpu+openbsd.myr
diff --git a/lib/thread/condvar+freebsd.myr b/lib/thread/condvar+freebsd.myr
index 65267a4a..002757ae 100644
--- a/lib/thread/condvar+freebsd.myr
+++ b/lib/thread/condvar+freebsd.myr
@@ -1,14 +1,14 @@
 use std
-use sys
 
 use "atomic"
 use "common"
 use "mutex"
+use "futex"
 
 pkg thread =
 	type cond = struct
 		_mtx	: mutex#
-		_seq	: uint32
+		_seq	: ftxtag
 	;;
 
 	const mkcond	: (mtx : mutex# -> cond)
@@ -27,33 +27,28 @@ const condwait = {cond
 
 	mtx = cond._mtx
 	seq = cond._seq
-
 	mtxunlock(mtx)
-	sys.umtx_op((&cond._seq : void#), \
-		sys.Umtxwaituintpriv, \
-		(seq : uint64), \
-		Zptr, Zptr)
 
 	/*
-	We need to atomically set the mutex to contended. This allows us to
-	pass responsibility for waking up the potential other waiters on to the
-	unlocker of the mutex.
+	FIXME?: `ftxwait` can be interrupted but `condwait` should always be
+	done in a loop anyway.
 	*/
-	while xchg(&mtx._state, Contended) != Unlocked
-		sys.umtx_op((&mtx._state : void#), \
-			sys.Umtxwaituintpriv, \
-			(Contended : uint64), \
-			Zptr, Zptr)
-	;;
+	ftxwait(&cond._seq, seq, Zptr)
+
+	mtxlock(mtx)
 }
 
 const condsignal = {cond : cond#
 	xadd(&cond._seq, 1)
-	sys.umtx_op((&cond._seq : void#), sys.Umtxwakepriv, 1, Zptr, Zptr)
+	ftxwake(&cond._seq)
 }
 
+/*
+`umtx_op` fully supports implementing condvars efficiently but also requires
+condvars to be implemented in a specific way. For now we'll just invite the
+thundering herd.
+*/
 const condbroadcast = {cond : cond#
 	xadd(&cond._seq, 1)
-	sys.umtx_op((&cond._seq : void#), sys.Umtxwakepriv, 0x7ffffff, Zptr, Zptr)
+	ftxwakeall(&cond._seq)
 }
-
diff --git a/lib/thread/condvar+linux.myr b/lib/thread/condvar+linux.myr
index cd4ab8c8..e1a9e100 100644
--- a/lib/thread/condvar+linux.myr
+++ b/lib/thread/condvar+linux.myr
@@ -27,8 +27,12 @@ const condwait = {cond
 
 	mtx = cond._mtx
 	seq = cond._seq
-
 	mtxunlock(mtx)
+
+	/*
+	FIXME?: `futex` can be interrupted but `condwait` should always be done
+	in a loop anyway.
+	*/
 	sys.futex(&cond._seq, sys.Futexwait | sys.Futexpriv, seq, Zptr, Zptr, 0)
 
 	/*
@@ -36,10 +40,7 @@ const condwait = {cond
 	pass responsibility for waking up the potential other waiters on to the
 	unlocker of the mutex.
 	*/
-	while xchg(&mtx._state, Contended) != Unlocked
-		sys.futex(&mtx._state, sys.Futexwait | sys.Futexpriv, \
-			Contended, Zptr, Zptr, 0)
-	;;
+	mtxcontended(mtx)
 }
 
 const condsignal = {cond : cond#
@@ -54,8 +55,7 @@ const condbroadcast = {cond : cond#
 	used for the number of threads to move, and is not ignored when
 	requeueing
 	*/
-	sys.futex(&cond._seq, sys.Futexcmprequeue | sys.Futexpriv, \
-		1, (0x7fffffff : sys.timespec#), \
-		&cond._mtx._state, cond._seq)
+	sys.futex(&cond._seq, sys.Futexrequeue | sys.Futexpriv,
+		1, (0x7fffffff : sys.timespec#),
+		(&cond._mtx._state : int32#), 0)
 }
-
diff --git a/lib/thread/condvar+openbsd:6.2.myr b/lib/thread/condvar+openbsd:6.2.myr
new file mode 100644
index 00000000..c72d0ee2
--- /dev/null
+++ b/lib/thread/condvar+openbsd:6.2.myr
@@ -0,0 +1,56 @@
+use std
+use sys
+
+use "atomic"
+use "common"
+use "mutex"
+
+pkg thread =
+	type cond = struct
+		_mtx	: mutex#
+		_seq	: uint32
+	;;
+
+	const mkcond	: (mtx : mutex# -> cond)
+	const condwait	: (cond : cond# -> void)
+	const condsignal	: (cond : cond# -> void)
+	const condbroadcast	: (cond : cond# -> void)
+;;
+
+const mkcond = {mtx
+	-> [._mtx = mtx, ._seq = 0]
+}
+
+const condwait = {cond
+	var seq
+	var mtx
+
+	mtx = cond._mtx
+	seq = cond._seq
+	mtxunlock(mtx)
+
+	/*
+	FIXME?: `futex` can be interrupted but `condwait` should always be done
+	in a loop anyway.
+	*/
+	sys.futex(&cond._seq, sys.Futexwait, seq, Zptr, Zptr)
+
+	/*
+	We need to atomically set the mutex to contended. This allows us to
+	pass responsibility for waking up the potential other waiters on to the
+	unlocker of the mutex.
+	*/
+	mtxcontended(mtx)
+}
+
+const condsignal = {cond : cond#
+	xadd(&cond._seq, 1)
+	sys.futex(&cond._seq, sys.Futexwake, 1, Zptr, Zptr)
+}
+
+const condbroadcast = {cond : cond#
+	xadd(&cond._seq, 1)
+	sys.futex(&cond._seq, sys.Futexrequeue, 1,
+		(0x7fffffff : sys.timespec#),
+		(&cond._mtx._state : uint32#))
+}
diff --git a/lib/thread/condvar+osx.myr b/lib/thread/condvar+osx.myr
new file mode 100644
index 00000000..d74c321f
--- /dev/null
+++ b/lib/thread/condvar+osx.myr
@@ -0,0 +1,53 @@
+use std
+
+use "atomic"
+use "common"
+use "mutex"
+use "futex"
+
+pkg thread =
+	type cond = struct
+		_mtx	: mutex#
+		_seq	: ftxtag
+	;;
+
+	const mkcond	: (mtx : mutex# -> cond)
+	const condwait	: (cond : cond# -> void)
+	const condsignal	: (cond : cond# -> void)
+	const condbroadcast	: (cond : cond# -> void)
+;;
+
+const mkcond = {mtx
+	-> [._mtx = mtx, ._seq = 0]
+}
+
+const condwait = {cond
+	var seq
+	var mtx
+
+	mtx = cond._mtx
+	seq = cond._seq
+	mtxunlock(mtx)
+
+	/*
+	FIXME?: `ftxwait` can be interrupted but `condwait` should always be
+	done in a loop anyway.
+	*/
+	ftxwait(&cond._seq, seq, Zptr)
+
+	mtxlock(mtx)
+}
+
+const condsignal = {cond : cond#
+	xadd(&cond._seq, 1)
+	ftxwake(&cond._seq)
+}
+
+/*
+Yes, this invites the thundering herd but that's what OS X gets for not having
+a requeue operation.
+*/
+const condbroadcast = {cond : cond#
+	xadd(&cond._seq, 1)
+	ftxwakeall(&cond._seq)
+}
diff --git a/lib/thread/condvar.myr b/lib/thread/condvar.myr
new file mode 100644
index 00000000..9649b920
--- /dev/null
+++ b/lib/thread/condvar.myr
@@ -0,0 +1,87 @@
+use "atomic"
+use "common"
+use "mutex"
+use "sem"
+
+pkg thread =
+	type cond = struct
+		_mtx	: mutex#
+		_waitq	: condwaiter#
+		_lock	: mutex
+	;;
+
+	const mkcond	: (mtx : mutex# -> cond)
+	const condwait	: (cond : cond# -> void)
+	const condsignal	: (cond : cond# -> void)
+	const condbroadcast	: (cond : cond# -> void)
+;;
+
+/*
+The waitqueue is a doubly-linked list because we'll need to remove waiters from
+anywhere in the list when we add timeout support.
+
+`cond._waitq.prev` is the tail of the queue.
+*/
+type condwaiter = struct
+	next : condwaiter#
+	prev : condwaiter#
+	sem : sem
+;;
+
+const mkcond = {mtx
+	-> [._mtx = mtx, ._lock = mkmtx()]
+}
+
+const condwait = {cond
+	var mtx = cond._mtx
+	var lock = &cond._lock
+	var waiter = [.sem = mksem(0)]
+
+	mtxlock(lock)
+	match cond._waitq
+	| Zptr:
+		waiter.prev = &waiter
+		cond._waitq = &waiter
+	| q:
+		waiter.prev = q.prev
+		waiter.prev.next = &waiter
+		q.prev = &waiter
+	;;
+
+	mtxunlock(lock)
+	mtxunlock(mtx)
+	semwait(&waiter.sem)
+
+	mtxlock(mtx)
+}
+
+const condsignal = {cond
+	var lock = &cond._lock
+
+	mtxlock(lock)
+	var head = cond._waitq
+	if head != Zptr
+		if head.next != Zptr
+			head.next.prev = head.prev
+		;;
+		cond._waitq = head.next
+		sempost(&head.sem)
+	;;
+	mtxunlock(lock)
+}
+
+/*
+Yes, this invites the thundering herd but that's what you get for not
+supporting futexes at all.
+*/
+const condbroadcast = {cond
+	var lock = &cond._lock
+	var head = Zptr
+
+	mtxlock(lock)
+	while (head = cond._waitq) != Zptr
+		cond._waitq = head.next
+		sempost(&head.sem)
+	;;
+	mtxunlock(lock)
+}
diff --git a/lib/thread/mutex+futex.myr b/lib/thread/mutex+futex.myr
index 5878755a..c8d40c61 100644
--- a/lib/thread/mutex+futex.myr
+++ b/lib/thread/mutex+futex.myr
@@ -12,11 +12,13 @@ pkg thread =
 	const mtxtrylock	: (mtx : mutex# -> bool)
 	const mtxunlock	: (mtx : mutex# -> void)
 
-	pkglocal const Unlocked = 0
-	pkglocal const Locked = 1
-	pkglocal const Contended = 2
+	pkglocal const mtxcontended	: (mtx : mutex# -> void)
 ;;
 
+const Unlocked = 0
+const Locked = 1
+const Contended = 2
+
 var nspin = 10	/* FIXME: pick a sane number, based on CPU count */
 
 const mkmtx = {
@@ -38,9 +40,9 @@ const mtxlock = {mtx
 	;;
 
 	/*
-	Contended case: we set the lock state to Contended. This indicates that there
-	the lock is locked, and we potentially have threads waiting on it, which means
-	that we will need to wake them up.
+	Contended case: we set the lock state to Contended. This indicates that
+	the lock is locked, and we potentially have threads waiting on it,
+	which means that we will need to wake them up.
 	*/
 	if c == Locked
 		c = xchg(&mtx._state, Contended)
@@ -71,3 +73,9 @@ const mtxunlock = {mtx
 	/* wake one thread */
 	ftxwake(&mtx._state)
 }
+
+const mtxcontended = {mtx
+	while xchg(&mtx._state, Contended) != Unlocked
+		ftxwait(&mtx._state, Contended, Zptr)
+	;;
+}
diff --git a/lib/thread/test/condvar.myr b/lib/thread/test/condvar.myr
index 0f1de58e..92380dc5 100644
--- a/lib/thread/test/condvar.myr
+++ b/lib/thread/test/condvar.myr
@@ -1,55 +1,44 @@
 use std
 use thread
 
-use "util"
+use thrtestutil
 
 const Nwakes = 1000
 
 var cv
+var cv1
 var mtx
 var val
 
-var done : int32
+var ready
 var nwoken : int32
-var nready : int32
-var locked : int32
 
 const main = {
-	done = 0
+	ready = thread.mkwg(2)
 	val = 123
 
 	mtx = thread.mkmtx()
 	cv = thread.mkcond(&mtx)
+	cv1 = thread.mkcond(&mtx)
 	thread.spawn(cvwait)
 	thread.spawn(cvwake)
-	while done == 0
-		/* nothing */
-	;;
+	thread.wgwait(&ready)
 	std.assert(nwoken == Nwakes, "wrong number of wakes")
-	std.assert(val == 123, "wrong val after all are done")
 
+	ready = thread.mkwg(100)
 	nwoken = 0
-	nready = 0
-	mkherd(100, cvwaitonce)
+	thrtestutil.mkherd(100, cvwaitonce)
 
-	/* wait until the herd is ready */
-	while nready != 100	/* 0 to 99 */
-		/* nothing */
-	;;
-	while locked == 0
-		/* nothing */
-	;;
-	thread.condbroadcast(&cv)
+	thread.wgwait(&ready)
 	while nwoken != 100
-		/* nothing */
+		thread.condbroadcast(&cv)
 	;;
-	std.assert(nwoken == 100, "wrong thread count woken")
-
 }
 
 const cvwait = {
 	for var i = 0; i < Nwakes; i++
 		thread.mtxlock(&mtx)
+		thread.condsignal(&cv1)
 		thread.condwait(&cv)
 		std.assert(val == 456, "wrong val after signal\n")
 		val = 123
@@ -57,29 +46,28 @@ const cvwait = {
 
 		thread.xadd(&nwoken, 1)
 	;;
-	val = 123
-	thread.xadd(&done, 1)
-
+	thread.condsignal(&cv1)
+	thread.wgpost(&ready)
 }
 
 const cvwake = {
 	while true
 		thread.mtxlock(&mtx)
 		val = 456
+		thread.condsignal(&cv)
+		thread.condwait(&cv1)
 		thread.mtxunlock(&mtx)
 
-		thread.condsignal(&cv)
 		if nwoken >= Nwakes
 			break
 		;;
 	;;
+	thread.wgpost(&ready)
 }
 
 const cvwaitonce = {
-	thread.xadd(&nready, 1)
-
 	thread.mtxlock(&mtx)
-	thread.xadd(&locked, 1)
+	thread.wgpost(&ready)
 	thread.condwait(&cv)
 	thread.mtxunlock(&mtx)
 
-- 
2.18.0


Follow-Ups:
Re: Subject: [PATCH 2/2] Add/fix condvar implementations.Ori Bernstein <ori@xxxxxxxxxxxxxx>