Eigenstate: myrddin-dev mailing list

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH 3/4] Add rwlocks.


Updating the first three patches.

Changes:
- Copied all of the relevant timespec, futex, etc. changes to
  `/support/syscall-gen/`.

- Added comment to FreeBSD and OS X condvar implementations about reacquiring
  the lock after being signalled.
- Simplified the fallback condvar implementation.
- The `ftxwait` timeout parameter is now a `std.time`. Negative values mean no
  timeout.

- Replaced the fallback rwlock implementation with a much simpler one.
- Added comments to the rwlock implementations and made the requested changes
  to the function names.

---
 lib/thread/bld.sub          |   2 +
 lib/thread/rwlock+futex.myr | 135 ++++++++++++++++++++++++++++++++++++
 lib/thread/rwlock.myr       |  78 +++++++++++++++++++++
 lib/thread/test/mutex.myr   |  15 ++--
 lib/thread/test/rwlock.myr  |  49 +++++++++++++
 5 files changed, 270 insertions(+), 9 deletions(-)
 create mode 100644 lib/thread/rwlock+futex.myr
 create mode 100644 lib/thread/rwlock.myr
 create mode 100644 lib/thread/test/rwlock.myr

diff --git a/lib/thread/bld.sub b/lib/thread/bld.sub
index ed2ea2dc..eff3c5bd 100644
--- a/lib/thread/bld.sub
+++ b/lib/thread/bld.sub
@@ -6,11 +6,13 @@ lib thread =
 	condvar.myr
 	mutex.myr
 	ncpu.myr
+	rwlock.myr
 	sem.myr
 	waitgrp.myr
 
 	# futex-based impls
 	mutex+futex.myr
+	rwlock+futex.myr
 	sem+futex.myr
 	waitgrp+futex.myr
 
diff --git a/lib/thread/rwlock+futex.myr b/lib/thread/rwlock+futex.myr
new file mode 100644
index 00000000..4975c953
--- /dev/null
+++ b/lib/thread/rwlock+futex.myr
@@ -0,0 +1,135 @@
+use std
+
+use "atomic"
+use "futex"
+
+pkg thread =
+	/*
+	The 31 low bits of `_state` contain either the number of readers
+	holding the lock, or `0x7fffffff` if the lock is held by a single
+	writer.
+
+	The high bit is set if there are any waiting readers or writers.
+	*/
+	type rwlock = struct
+		_state : ftxtag
+	;;
+
+	const mkrwlock  : (-> rwlock)
+	const rdlock    : (rw : rwlock# -> void)
+	const wrlock    : (rw : rwlock# -> void)
+	const tryrdlock : (rw : rwlock# -> bool)
+	const trywrlock : (rw : rwlock# -> bool)
+	const rdunlock  : (rw : rwlock# -> void)
+	const wrunlock  : (rw : rwlock# -> void)
+;;
+
+const Nrmask  = 0x7fffffff
+const Waitbit = 0x80000000
+
+const mkrwlock = {
+	-> [._state = 0]
+}
+
+const rdlock = {rw
+	for ; ;
+		var s = xget(&rw._state)
+		match s & Nrmask
+		| Nrmask - 1: std.die("error: rwlock overflowed\n")
+		| Nrmask:
+			/*
+			The lock is held by a writer so we attempt to CAS in
+			the wait bit and wait. If the CAS fails, the state of
+			the lock has changed so we can try once again to
+			acquire it.
+			*/
+			if xcas(&rw._state, s, Nrmask | Waitbit) == s
+				ftxwait(&rw._state, Nrmask | Waitbit, 0)
+			;;
+		| _:
+			/*
+			Otherwise the lock is either unlocked or held by some
+			number of readers. Either way we simply increment the
+			reader count via CAS.
+			*/
+			if xcas(&rw._state, s, s + 1) == s
+				-> void
+			;;
+		;;
+	;;
+}
+
+const wrlock = {rw
+	for ; ;
+		/*
+		`_state` must be 0 for a writer to acquire the lock. Anything
+		else means the lock is either held or in the process of being
+		released by the last reader.
+		 */
+		var s = xcas(&rw._state, 0, Nrmask)
+		if s == 0
+			-> void
+		;;
+
+		/*
+		If we fail to acquire the lock, attempt to CAS in the wait bit
+		and wait. It the CAS fails, the state of the lock has changed
+		so we can try once again to acquire it.
+		*/
+		if xcas(&rw._state, s, s | Waitbit) == s
+			ftxwait(&rw._state, s | Waitbit, 0)
+		;;
+	;;
+}
+
+const tryrdlock = {rw
+	for ; ;
+		var s = xget(&rw._state)
+		match s & Nrmask
+		| Nrmask - 1: std.die("error: rwlock overflowed\n")
+		| Nrmask: -> false
+		| _:
+			if xcas(&rw._state, s, s + 1) == s
+				-> true
+			;;
+		;;
+	;;
+	-> false /* Unreachable */
+}
+
+const trywrlock = {rw
+	-> xcas(&rw._state, 0, Nrmask) == 0
+}
+
+const rdunlock = {rw
+	/*
+	Only the last reader needs to potentially wake a writer so all other
+	readers can get away with simply decrementing the reader count via FAA.
+	*/
+	var prev = xadd(&rw._state, -1)
+	std.assert(prev & Nrmask != 0, "error: rwlock underflowed\n")
+	if prev & Nrmask == 1 && prev & Waitbit != 0
+		/*
+		If there are one or more waiting writers and no other readers
+		have acquired the lock since we fetched the reader count, then
+		the value of `_state` is guaranteed to be `Waitbit`. If the CAS
+		succeeds, we wake one of those writers.
+		*/
+		if xcas(&rw._state, Waitbit, 0) == Waitbit
+			ftxwake(&rw._state)
+		;;
+	;;
+}
+
+const wrunlock = {rw
+	/*
+	If the wait bit was set then there are one or more waiting readers,
+	writers, or both. In the first and third cases, we need to wake
+	everyone; in the second, we'd like to just wake one thread. However, we
+	currently have no way of knowing which case we're in so we always have
+	to wake everyone.
+	*/
+	if xchg(&rw._state, 0) & Waitbit != 0
+		ftxwakeall(&rw._state)
+	;;
+}
diff --git a/lib/thread/rwlock.myr b/lib/thread/rwlock.myr
new file mode 100644
index 00000000..5545638d
--- /dev/null
+++ b/lib/thread/rwlock.myr
@@ -0,0 +1,78 @@
+use "mutex"
+
+pkg thread =
+	/*
+	`_lock` grants exclusive access to either n readers or one writer.
+
+	`_rlock` protects `_rcount` and allows the first reader to attempt to
+	lock `_lockk` without letting other readers in.
+	*/
+	type rwlock = struct
+		_lock   : mutex
+		_rlock  : mutex
+		_rcount : uint32
+	;;
+
+	const mkrwlock  : (-> rwlock)
+	const rdlock    : (rw : rwlock# -> void)
+	const wrlock    : (rw : rwlock# -> void)
+	const tryrdlock : (rw : rwlock# -> bool)
+	const trywrlock : (rw : rwlock# -> bool)
+	const rdunlock  : (rw : rwlock# -> void)
+	const wrunlock  : (rw : rwlock# -> void)
+;;
+
+const mkrwlock = {
+	-> [._lock = mkmtx(), ._rlock = mkmtx()]
+}
+
+const rdlock = {rw
+	mtxlock(&rw._rlock)
+
+	/*
+	The first reader either successfully acquires `_lock`, locking out all
+	writers, or blocks while holding `_rlock`, locking out all other
+	readers until it is able to acquire `_lock`, meaning that a writer has
+	released it.
+	*/
+	if rw._rcount++ == 0
+		mtxlock(&rw._lock)
+	;;
+	mtxunlock(&rw._rlock)
+}
+
+const wrlock = {rw
+	mtxlock(&rw._lock)
+}
+
+const tryrdlock = {rw
+	var rc = true
+	mtxlock(&rw._rlock)
+	if rw._rcount++ == 0
+		if !mtxtrylock(&rw._lock)
+			rw._rcount--
+			rc = false
+		;;
+	;;
+	mtxunlock(&rw._rlock)
+	-> rc
+}
+
+const trywrlock = {rw
+	-> mtxtrylock(&rw._lock)
+}
+
+const rdunlock = {rw
+	mtxlock(&rw._rlock)
+
+	/* `_lock` is not released until the last reader releases the lock. */
+	if --rw._rcount == 0
+		mtxunlock(&rw._lock)
+	;;
+	mtxunlock(&rw._rlock)
+
+}
+
+const wrunlock = {rw
+	mtxunlock(&rw._lock)
+}
diff --git a/lib/thread/test/mutex.myr b/lib/thread/test/mutex.myr
index fd58df13..30624449 100644
--- a/lib/thread/test/mutex.myr
+++ b/lib/thread/test/mutex.myr
@@ -6,19 +6,16 @@ use thrtestutil
 const Nherd = 20
 
 var val : uint64 = 0
-var done : uint32 = 0
 var mtx : thread.mutex
+var done
 
 const main = {
-	done = 0
-	val = 0
-
 	mtx = thread.mkmtx()
+	done = thread.mkwg(Nherd)
+
 	thrtestutil.mkherd(Nherd, incvar)
-	while thread.xget(&done) != Nherd
-		/* nothing */
-	;;
-	if val != 1000 * 20
+	thread.wgwait(&done)
+	if val != 1000 * (Nherd : uint64)
 		std.fatal("mutexes are broken, got {}\n", val)
 	;;
 }
@@ -29,5 +26,5 @@ const incvar = {
 		val++
 		thread.mtxunlock(&mtx)
 	;;
-	thread.xadd(&done, 1)
+	thread.wgpost(&done)
 }
diff --git a/lib/thread/test/rwlock.myr b/lib/thread/test/rwlock.myr
new file mode 100644
index 00000000..857549c2
--- /dev/null
+++ b/lib/thread/test/rwlock.myr
@@ -0,0 +1,49 @@
+use std
+use thread
+
+use thrtestutil
+
+const Nherd = 20
+const Nloops = 100_000
+
+var val : uint64
+var nreaders : uint32
+var nwriters : uint32
+var rw
+var done
+
+const main = {
+	rw = thread.mkrwlock()
+	done = thread.mkwg(Nherd)
+
+	thrtestutil.mkherd(Nherd, read)
+	thrtestutil.mkherd(Nherd, incvar)
+	thread.wgwait(&done)
+	if val != Nloops * (Nherd : uint64)
+		std.fatal("rwlocks are broken, got {}\n", val)
+	;;
+}
+
+const incvar = {
+	for var i = 0; i < Nloops; i++
+		thread.wrlock(&rw)
+		thread.xadd(&nwriters, 1)
+		std.assert(thread.xget(&nreaders) == 0, "incvar: rwlocks are broken\n")
+		val++
+		thread.xadd(&nwriters, -1)
+		thread.wrunlock(&rw)
+	;;
+	thread.wgpost(&done)
+}
+
+const read = {
+	/* Linux seems to not want to end the process when there are still running threads. */
+	while thread.xget(&done._val) != 0
+		thread.rdlock(&rw)
+		thread.xadd(&nreaders, 1)
+		std.assert(thread.xget(&nwriters) == 0, "read: rwlocks are broken\n")
+		thread.xadd(&nreaders, -1)
+		thread.rdunlock(&rw)
+		std.usleep(1000)
+	;;
+}
-- 
2.18.0



References:
[PATCH 3/4] Add rwlocks.iriri <iri@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx>
Re: [PATCH 3/4] Add rwlocks.Ori Bernstein <ori@xxxxxxxxxxxxxx>