Eigenstate: myrddin-dev mailing list

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Subject: [PATCH 2/2] Add/fix condvar implementations.


Hm. I've never really liked condition variables -- every time I use them, I
find myself thinking that there has to be a nicer abstraction, and I wonder
if we can think a bit and find some better alternative.

Still, they're pretty common and widely used, and it's better to have them
working if the code is checked in. Landed both of these changes.

When you get a chance, send a doc update.

On Mon, 23 Jul 2018 23:20:20 -0700, iriri <iri@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx> wrote:

> ---
>  lib/thread/bld.sub                 | 22 ++++----
>  lib/thread/condvar+freebsd.myr     | 33 +++++-------
>  lib/thread/condvar+linux.myr       | 18 +++----
>  lib/thread/condvar+openbsd:6.2.myr | 56 +++++++++++++++++++
>  lib/thread/condvar+osx.myr         | 53 ++++++++++++++++++
>  lib/thread/condvar.myr             | 87 ++++++++++++++++++++++++++++++
>  lib/thread/mutex+futex.myr         | 20 ++++---
>  lib/thread/test/condvar.myr        | 46 ++++++----------
>  8 files changed, 263 insertions(+), 72 deletions(-)
>  create mode 100644 lib/thread/condvar+openbsd:6.2.myr
>  create mode 100644 lib/thread/condvar+osx.myr
>  create mode 100644 lib/thread/condvar.myr
> 
> diff --git a/lib/thread/bld.sub b/lib/thread/bld.sub
> index a0c1a745..ed2ea2dc 100644
> --- a/lib/thread/bld.sub
> +++ b/lib/thread/bld.sub
> @@ -1,25 +1,28 @@
>  lib thread =
>  	common.myr
>  	hookstd.myr	# install thread hooks
> +
> +	# generic fallbacks
> +	condvar.myr
> +	mutex.myr
> +	ncpu.myr
> +	sem.myr
> +	waitgrp.myr
> +
> +	# futex-based impls
>  	mutex+futex.myr
>  	sem+futex.myr
>  	waitgrp+futex.myr
> -	mutex.myr	# fallback, for unimplemented platforms
> -	sem.myr		# fallback, for unimplemented platforms
> -	waitgrp.myr	# fallback, for unimplemented platforms
> -
> -	#generic fallbacks
> -	ncpu.myr
>  
>  	# linux impl of basic thread primitives
> -	#condvar+linux.myr
> +	condvar+linux.myr
>  	exit+linux-x64.s
>  	futex+linux.myr
>  	ncpu+linux.myr
>  	spawn+linux.myr
>  
>  	# freebsd impl of thread primitives
> -	#condvar+freebsd.myr
> +	condvar+freebsd.myr
>  	exit+freebsd-x64.s
>  	futex+freebsd.myr
>  	ncpu+freebsd.myr
> @@ -33,7 +36,7 @@ lib thread =
>  	#exit+netbsd-x64.s
>  
>  	# osx impl of thread primitives
> -	#condvar+osx.myr
> +	condvar+osx.myr
>  	futex+osx.myr
>  	spawn+osx.myr
>  	start+osx-x64.s
> @@ -47,6 +50,7 @@ lib thread =
>  	spawn+plan9.myr
>  
>  	# openbsd impl of thread primitives
> +	condvar+openbsd:6.2.myr
>  	exit+openbsd-x64.s
>  	futex+openbsd:6.2.myr
>  	ncpu+openbsd.myr
> diff --git a/lib/thread/condvar+freebsd.myr b/lib/thread/condvar+freebsd.myr
> index 65267a4a..002757ae 100644
> --- a/lib/thread/condvar+freebsd.myr
> +++ b/lib/thread/condvar+freebsd.myr
> @@ -1,14 +1,14 @@
>  use std
> -use sys
>  
>  use "atomic"
>  use "common"
>  use "mutex"
> +use "futex"
>  
>  pkg thread =
>  	type cond = struct
>  		_mtx	: mutex#
> -		_seq	: uint32
> +		_seq	: ftxtag
>  	;;
>  
>  	const mkcond	: (mtx : mutex# -> cond)
> @@ -27,33 +27,28 @@ const condwait = {cond
>  
>  	mtx = cond._mtx
>  	seq = cond._seq
> -
>  	mtxunlock(mtx)
> -	sys.umtx_op((&cond._seq : void#), \
> -		sys.Umtxwaituintpriv, \
> -		(seq : uint64), \
> -		Zptr, Zptr)
>  
>  	/*
> -	We need to atomically set the mutex to contended. This allows us to
> -	pass responsibility for waking up the potential other waiters on to the
> -	unlocker of the mutex.
> +	FIXME?: `ftxwait` can be interrupted but `condwait` should always be
> +	done in a loop anyway.
>  	*/
> -	while xchg(&mtx._state, Contended) != Unlocked
> -		sys.umtx_op((&mtx._state : void#), \
> -			sys.Umtxwaituintpriv, \
> -			(Contended : uint64), \
> -			Zptr, Zptr)
> -	;;
> +	ftxwait(&cond._seq, seq, Zptr)
> +
> +	mtxlock(mtx)
>  }
>  
>  const condsignal = {cond : cond#
>  	xadd(&cond._seq, 1)
> -	sys.umtx_op((&cond._seq : void#), sys.Umtxwakepriv, 1, Zptr, Zptr)
> +	ftxwake(&cond._seq)
>  }
>  
> +/*
> +`umtx_op` fully supports implementing condvars efficiently but also requires
> +condvars to be implemented in a specific way. For now we'll just invite the
> +thundering herd.
> +*/
>  const condbroadcast = {cond : cond#
>  	xadd(&cond._seq, 1)
> -	sys.umtx_op((&cond._seq : void#), sys.Umtxwakepriv, 0x7ffffff, Zptr, Zptr)
> +	ftxwakeall(&cond._seq)
>  }
> -
> diff --git a/lib/thread/condvar+linux.myr b/lib/thread/condvar+linux.myr
> index cd4ab8c8..e1a9e100 100644
> --- a/lib/thread/condvar+linux.myr
> +++ b/lib/thread/condvar+linux.myr
> @@ -27,8 +27,12 @@ const condwait = {cond
>  
>  	mtx = cond._mtx
>  	seq = cond._seq
> -
>  	mtxunlock(mtx)
> +
> +	/*
> +	FIXME?: `futex` can be interrupted but `condwait` should always be done
> +	in a loop anyway.
> +	*/
>  	sys.futex(&cond._seq, sys.Futexwait | sys.Futexpriv, seq, Zptr, Zptr, 0)
>  
>  	/*
> @@ -36,10 +40,7 @@ const condwait = {cond
>  	pass responsibility for waking up the potential other waiters on to the
>  	unlocker of the mutex.
>  	*/
> -	while xchg(&mtx._state, Contended) != Unlocked
> -		sys.futex(&mtx._state, sys.Futexwait | sys.Futexpriv, \
> -			Contended, Zptr, Zptr, 0)
> -	;;
> +	mtxcontended(mtx)
>  }
>  
>  const condsignal = {cond : cond#
> @@ -54,8 +55,7 @@ const condbroadcast = {cond : cond#
>  	used for the number of threads to move, and is not ignored when
>  	requeueing
>  	*/
> -	sys.futex(&cond._seq, sys.Futexcmprequeue | sys.Futexpriv, \
> -		1, (0x7fffffff : sys.timespec#), \
> -		&cond._mtx._state, cond._seq)
> +	sys.futex(&cond._seq, sys.Futexrequeue | sys.Futexpriv,
> +		1, (0x7fffffff : sys.timespec#),
> +		(&cond._mtx._state : int32#), 0)
>  }
> -
> diff --git a/lib/thread/condvar+openbsd:6.2.myr b/lib/thread/condvar+openbsd:6.2.myr
> new file mode 100644
> index 00000000..c72d0ee2
> --- /dev/null
> +++ b/lib/thread/condvar+openbsd:6.2.myr
> @@ -0,0 +1,56 @@
> +use std
> +use sys
> +
> +use "atomic"
> +use "common"
> +use "mutex"
> +
> +pkg thread =
> +	type cond = struct
> +		_mtx	: mutex#
> +		_seq	: uint32
> +	;;
> +
> +	const mkcond	: (mtx : mutex# -> cond)
> +	const condwait	: (cond : cond# -> void)
> +	const condsignal	: (cond : cond# -> void)
> +	const condbroadcast	: (cond : cond# -> void)
> +;;
> +
> +const mkcond = {mtx
> +	-> [._mtx = mtx, ._seq = 0]
> +}
> +
> +const condwait = {cond
> +	var seq
> +	var mtx
> +
> +	mtx = cond._mtx
> +	seq = cond._seq
> +	mtxunlock(mtx)
> +
> +	/*
> +	FIXME?: `futex` can be interrupted but `condwait` should always be done
> +	in a loop anyway.
> +	*/
> +	sys.futex(&cond._seq, sys.Futexwait, seq, Zptr, Zptr)
> +
> +	/*
> +	We need to atomically set the mutex to contended. This allows us to
> +	pass responsibility for waking up the potential other waiters on to the
> +	unlocker of the mutex.
> +	*/
> +	mtxcontended(mtx)
> +}
> +
> +const condsignal = {cond : cond#
> +	xadd(&cond._seq, 1)
> +	sys.futex(&cond._seq, sys.Futexwake, 1, Zptr, Zptr)
> +}
> +
> +const condbroadcast = {cond : cond#
> +	xadd(&cond._seq, 1)
> +	sys.futex(&cond._seq, sys.Futexrequeue, 1,
> +		(0x7fffffff : sys.timespec#),
> +		(&cond._mtx._state : uint32#))
> +}
> diff --git a/lib/thread/condvar+osx.myr b/lib/thread/condvar+osx.myr
> new file mode 100644
> index 00000000..d74c321f
> --- /dev/null
> +++ b/lib/thread/condvar+osx.myr
> @@ -0,0 +1,53 @@
> +use std
> +
> +use "atomic"
> +use "common"
> +use "mutex"
> +use "futex"
> +
> +pkg thread =
> +	type cond = struct
> +		_mtx	: mutex#
> +		_seq	: ftxtag
> +	;;
> +
> +	const mkcond	: (mtx : mutex# -> cond)
> +	const condwait	: (cond : cond# -> void)
> +	const condsignal	: (cond : cond# -> void)
> +	const condbroadcast	: (cond : cond# -> void)
> +;;
> +
> +const mkcond = {mtx
> +	-> [._mtx = mtx, ._seq = 0]
> +}
> +
> +const condwait = {cond
> +	var seq
> +	var mtx
> +
> +	mtx = cond._mtx
> +	seq = cond._seq
> +	mtxunlock(mtx)
> +
> +	/*
> +	FIXME?: `ftxwait` can be interrupted but `condwait` should always be
> +	done in a loop anyway.
> +	*/
> +	ftxwait(&cond._seq, seq, Zptr)
> +
> +	mtxlock(mtx)
> +}
> +
> +const condsignal = {cond : cond#
> +	xadd(&cond._seq, 1)
> +	ftxwake(&cond._seq)
> +}
> +
> +/*
> +Yes, this invites the thundering herd but that's what OS X gets for not having
> +a requeue operation.
> +*/
> +const condbroadcast = {cond : cond#
> +	xadd(&cond._seq, 1)
> +	ftxwakeall(&cond._seq)
> +}
> diff --git a/lib/thread/condvar.myr b/lib/thread/condvar.myr
> new file mode 100644
> index 00000000..9649b920
> --- /dev/null
> +++ b/lib/thread/condvar.myr
> @@ -0,0 +1,87 @@
> +use "atomic"
> +use "common"
> +use "mutex"
> +use "sem"
> +
> +pkg thread =
> +	type cond = struct
> +		_mtx	: mutex#
> +		_waitq	: condwaiter#
> +		_lock	: mutex
> +	;;
> +
> +	const mkcond	: (mtx : mutex# -> cond)
> +	const condwait	: (cond : cond# -> void)
> +	const condsignal	: (cond : cond# -> void)
> +	const condbroadcast	: (cond : cond# -> void)
> +;;
> +
> +/*
> +The waitqueue is a doubly-linked list because we'll need to remove waiters from
> +anywhere in the list when we add timeout support.
> +
> +`cond._waitq.prev` is the tail of the queue.
> +*/
> +type condwaiter = struct
> +	next : condwaiter#
> +	prev : condwaiter#
> +	sem : sem
> +;;
> +
> +const mkcond = {mtx
> +	-> [._mtx = mtx, ._lock = mkmtx()]
> +}
> +
> +const condwait = {cond
> +	var mtx = cond._mtx
> +	var lock = &cond._lock
> +	var waiter = [.sem = mksem(0)]
> +
> +	mtxlock(lock)
> +	match cond._waitq
> +	| Zptr:
> +		waiter.prev = &waiter
> +		cond._waitq = &waiter
> +	| q:
> +		waiter.prev = q.prev
> +		waiter.prev.next = &waiter
> +		q.prev = &waiter
> +	;;
> +
> +	mtxunlock(lock)
> +	mtxunlock(mtx)
> +	semwait(&waiter.sem)
> +
> +	mtxlock(mtx)
> +}
> +
> +const condsignal = {cond
> +	var lock = &cond._lock
> +
> +	mtxlock(lock)
> +	var head = cond._waitq
> +	if head != Zptr
> +		if head.next != Zptr
> +			head.next.prev = head.prev
> +		;;
> +		cond._waitq = head.next
> +		sempost(&head.sem)
> +	;;
> +	mtxunlock(lock)
> +}
> +
> +/*
> +Yes, this invites the thundering herd but that's what you get for not
> +supporting futexes at all.
> +*/
> +const condbroadcast = {cond
> +	var lock = &cond._lock
> +	var head = Zptr
> +
> +	mtxlock(lock)
> +	while (head = cond._waitq) != Zptr
> +		cond._waitq = head.next
> +		sempost(&head.sem)
> +	;;
> +	mtxunlock(lock)
> +}
> diff --git a/lib/thread/mutex+futex.myr b/lib/thread/mutex+futex.myr
> index 5878755a..c8d40c61 100644
> --- a/lib/thread/mutex+futex.myr
> +++ b/lib/thread/mutex+futex.myr
> @@ -12,11 +12,13 @@ pkg thread =
>  	const mtxtrylock	: (mtx : mutex# -> bool)
>  	const mtxunlock	: (mtx : mutex# -> void)
>  
> -	pkglocal const Unlocked = 0
> -	pkglocal const Locked = 1
> -	pkglocal const Contended = 2
> +	pkglocal const mtxcontended	: (mtx : mutex# -> void)
>  ;;
>  
> +const Unlocked = 0
> +const Locked = 1
> +const Contended = 2
> +
>  var nspin = 10	/* FIXME: pick a sane number, based on CPU count */
>  
>  const mkmtx = {
> @@ -38,9 +40,9 @@ const mtxlock = {mtx
>  	;;
>  
>  	/*
> -	Contended case: we set the lock state to Contended. This indicates that there
> -	the lock is locked, and we potentially have threads waiting on it, which means
> -	that we will need to wake them up.
> +	Contended case: we set the lock state to Contended. This indicates that
> +	the lock is locked, and we potentially have threads waiting on it,
> +	which means that we will need to wake them up.
>  	*/
>  	if c == Locked
>  		c = xchg(&mtx._state, Contended)
> @@ -71,3 +73,9 @@ const mtxunlock = {mtx
>  	/* wake one thread */
>  	ftxwake(&mtx._state)
>  }
> +
> +const mtxcontended = {mtx
> +	while xchg(&mtx._state, Contended) != Unlocked
> +		ftxwait(&mtx._state, Contended, Zptr)
> +	;;
> +}
> diff --git a/lib/thread/test/condvar.myr b/lib/thread/test/condvar.myr
> index 0f1de58e..92380dc5 100644
> --- a/lib/thread/test/condvar.myr
> +++ b/lib/thread/test/condvar.myr
> @@ -1,55 +1,44 @@
>  use std
>  use thread
>  
> -use "util"
> +use thrtestutil
>  
>  const Nwakes = 1000
>  
>  var cv
> +var cv1
>  var mtx
>  var val
>  
> -var done : int32
> +var ready
>  var nwoken : int32
> -var nready : int32
> -var locked : int32
>  
>  const main = {
> -	done = 0
> +	ready = thread.mkwg(2)
>  	val = 123
>  
>  	mtx = thread.mkmtx()
>  	cv = thread.mkcond(&mtx)
> +	cv1 = thread.mkcond(&mtx)
>  	thread.spawn(cvwait)
>  	thread.spawn(cvwake)
> -	while done == 0
> -		/* nothing */
> -	;;
> +	thread.wgwait(&ready)
>  	std.assert(nwoken == Nwakes, "wrong number of wakes")
> -	std.assert(val == 123, "wrong val after all are done")
>  
> +	ready = thread.mkwg(100)
>  	nwoken = 0
> -	nready = 0
> -	mkherd(100, cvwaitonce)
> +	thrtestutil.mkherd(100, cvwaitonce)
>  
> -	/* wait until the herd is ready */
> -	while nready != 100	/* 0 to 99 */
> -		/* nothing */
> -	;;
> -	while locked == 0
> -		/* nothing */
> -	;;
> -	thread.condbroadcast(&cv)
> +	thread.wgwait(&ready)
>  	while nwoken != 100
> -		/* nothing */
> +		thread.condbroadcast(&cv)
>  	;;
> -	std.assert(nwoken == 100, "wrong thread count woken")
> -
>  }
>  
>  const cvwait = {
>  	for var i = 0; i < Nwakes; i++
>  		thread.mtxlock(&mtx)
> +		thread.condsignal(&cv1)
>  		thread.condwait(&cv)
>  		std.assert(val == 456, "wrong val after signal\n")
>  		val = 123
> @@ -57,29 +46,28 @@ const cvwait = {
>  
>  		thread.xadd(&nwoken, 1)
>  	;;
> -	val = 123
> -	thread.xadd(&done, 1)
> -
> +	thread.condsignal(&cv1)
> +	thread.wgpost(&ready)
>  }
>  
>  const cvwake = {
>  	while true
>  		thread.mtxlock(&mtx)
>  		val = 456
> +		thread.condsignal(&cv)
> +		thread.condwait(&cv1)
>  		thread.mtxunlock(&mtx)
>  
> -		thread.condsignal(&cv)
>  		if nwoken >= Nwakes
>  			break
>  		;;
>  	;;
> +	thread.wgpost(&ready)
>  }
>  
>  const cvwaitonce = {
> -	thread.xadd(&nready, 1)
> -
>  	thread.mtxlock(&mtx)
> -	thread.xadd(&locked, 1)
> +	thread.wgpost(&ready)
>  	thread.condwait(&cv)
>  	thread.mtxunlock(&mtx)
>  
> -- 
> 2.18.0
> 
> 


-- 
    Ori Bernstein

References:
Subject: [PATCH 2/2] Add/fix condvar implementations.iriri <iri@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx>