Eigenstate: myrddin-dev mailing list

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH 3/4] Add rwlocks.


I'm not 100% sure about the futex-based implementation, which is based on musl's. I kind of think it should favor writers more.

---
 lib/thread/bld.sub          |   2 +
 lib/thread/rwlock+futex.myr |  90 ++++++++++++++++++++++
 lib/thread/rwlock.myr       | 149 ++++++++++++++++++++++++++++++++++++
 lib/thread/test/mutex.myr   |  15 ++--
 lib/thread/test/rwlock.myr  |  50 ++++++++++++
 5 files changed, 297 insertions(+), 9 deletions(-)
 create mode 100644 lib/thread/rwlock+futex.myr
 create mode 100644 lib/thread/rwlock.myr
 create mode 100644 lib/thread/test/rwlock.myr

diff --git a/lib/thread/bld.sub b/lib/thread/bld.sub
index ed2ea2dc..eff3c5bd 100644
--- a/lib/thread/bld.sub
+++ b/lib/thread/bld.sub
@@ -6,11 +6,13 @@ lib thread =
 	condvar.myr
 	mutex.myr
 	ncpu.myr
+	rwlock.myr
 	sem.myr
 	waitgrp.myr
 
 	# futex-based impls
 	mutex+futex.myr
+	rwlock+futex.myr
 	sem+futex.myr
 	waitgrp+futex.myr
 
diff --git a/lib/thread/rwlock+futex.myr b/lib/thread/rwlock+futex.myr
new file mode 100644
index 00000000..a8aadf45
--- /dev/null
+++ b/lib/thread/rwlock+futex.myr
@@ -0,0 +1,90 @@
+use std
+
+use "atomic"
+use "futex"
+
+pkg thread =
+	type rwlock = struct
+		_state : ftxtag /* _nreaders:31 : uint32, _wbit:1 : uint32 */
+	;;
+
+	const mkrw       : (-> rwlock)
+	const rwrlock    : (rw : rwlock# -> void)
+	const rwwlock    : (rw : rwlock# -> void)
+	const rwtryrlock : (rw : rwlock# -> bool)
+	const rwtrywlock : (rw : rwlock# -> bool)
+	const rwrunlock  : (rw : rwlock# -> void)
+	const rwwunlock  : (rw : rwlock# -> void)
+;;
+
+const Nrmask = 0x7fffffff
+const Wbit   = 0x80000000
+
+const mkrw = {
+	-> [._state = 0]
+}
+
+const rwrlock = {rw
+	for ; ;
+		var s = xget(&rw._state)
+		match s & Nrmask
+		| Nrmask - 1: std.die("error: rwlock overflowed\n")
+		| Nrmask:
+			if xcas(&rw._state, s, Nrmask | Wbit) == s
+				ftxwait(&rw._state, Nrmask | Wbit, 0)
+			;;
+		| _:
+			if xcas(&rw._state, s, s + 1) == s
+				-> void
+			;;
+		;;
+	;;
+}
+
+const rwwlock = {rw
+	for ; ;
+		var s = xcas(&rw._state, 0, Nrmask)
+		if s == 0
+			-> void
+		;;
+
+		if xcas(&rw._state, s, s | Wbit) == s
+			ftxwait(&rw._state, s | Wbit, 0)
+		;;
+	;;
+}
+
+const rwtryrlock = {rw
+	for ; ;
+		var s = xget(&rw._state)
+		match s & Nrmask
+		| Nrmask - 1: std.die("error: rwlock overflowed\n")
+		| Nrmask: -> false
+		| _:
+			if xcas(&rw._state, s, s + 1) == s
+				-> true
+			;;
+		;;
+	;;
+	-> false /* Unreachable */
+}
+
+const rwtrywlock = {rw
+	-> xcas(&rw._state, 0, Nrmask) == 0
+}
+
+const rwrunlock = {rw
+	var prev = xadd(&rw._state, -1)
+	std.assert(prev & Nrmask != 0, "error: rwlock underflowed\n")
+	if prev & Nrmask == 1 && prev & Wbit != 0
+		if xcas(&rw._state, Wbit, 0) == Wbit
+			ftxwake(&rw._state)
+		;;
+	;;
+}
+
+const rwwunlock = {rw
+	if xchg(&rw._state, 0) & Wbit != 0
+		ftxwakeall(&rw._state) /* Just Do It(tm). */
+	;;
+}
diff --git a/lib/thread/rwlock.myr b/lib/thread/rwlock.myr
new file mode 100644
index 00000000..a8241218
--- /dev/null
+++ b/lib/thread/rwlock.myr
@@ -0,0 +1,149 @@
+use std
+
+use "common"
+use "mutex"
+use "sem"
+
+pkg thread =
+	type rwlock = struct
+		_head  : rwwaiter#
+		_tail  : rwwaiter#
+		_lock  : mutex
+		_state : uint32 /* _nreaders:31 : uint32, _wbit:1 : uint32 */
+	;;
+
+	const mkrw       : (-> rwlock)
+	const rwrlock    : (rw : rwlock# -> void)
+	const rwwlock    : (rw : rwlock# -> void)
+	const rwtryrlock : (rw : rwlock# -> bool)
+	const rwtrywlock : (rw : rwlock# -> bool)
+	const rwrunlock  : (rw : rwlock# -> void)
+	const rwwunlock  : (rw : rwlock# -> void)
+;;
+
+/*
+We can't use a condvar here if we want to avoid heap allocations, which we do,
+if only for consistency with the futex based implementation.
+*/
+type rwwaiter = struct
+	next : rwwaiter#
+	sem : sem
+;;
+
+const Nrmask = 0x7fffffff
+const Wbit   = 0x80000000
+
+const mkrw = {
+	-> [._lock = mkmtx()]
+}
+
+const rwrlock = {rw
+	for ; ;
+		mtxlock(&rw._lock)
+		match rw._state & Nrmask
+		| Nrmask - 1: std.die("error: rwlock overflowed\n")
+		| Nrmask:
+			rw._state |= Wbit
+			var waiter = std.mk([.sem = mksem(0)])
+			if rw._tail != Zptr
+				rw._tail.next = waiter
+			;;
+			rw._tail = waiter
+
+			mtxunlock(&rw._lock)
+			semwait(&waiter.sem)
+			std.free(waiter)
+		| _:
+			rw._state++
+			mtxunlock(&rw._lock)
+			-> void
+		;;
+	;;
+}
+
+const rwwlock = {rw
+	for ; ;
+		mtxlock(&rw._lock)
+		if rw._state == 0
+			rw._state = Nrmask
+			mtxunlock(&rw._lock)
+			-> void
+		;;
+
+		/* Favor writers over readers to avoid writer starvation. */
+		rw._state |= Wbit
+		var waiter = std.mk([.sem = mksem(0)])
+		waiter.next = rw._head
+		rw._head = waiter
+
+		mtxunlock(&rw._lock)
+		semwait(&waiter.sem)
+		std.free(waiter)
+	;;
+}
+
+const rwtryrlock = {rw
+	mtxlock(&rw._lock)
+	match rw._state & Nrmask
+	| Nrmask - 1: std.die("error: rwlock overflowed\n")
+	| Nrmask:
+		mtxunlock(&rw._lock)
+		-> false
+	| _:
+		rw._state++
+		mtxunlock(&rw._lock)
+		-> true
+	;;
+}
+
+const rwtrywlock = {rw
+	var rc
+	mtxlock(&rw._lock)
+	if rw._state == 0
+		rw._state = Nrmask
+		rc = true
+	else
+		rc = false
+	;;
+	mtxunlock(&rw._lock)
+	-> rc
+}
+
+const rwrunlock = {rw
+	mtxlock(&rw._lock)
+	var prev = rw._state--
+	std.assert(prev & Nrmask != 0, "error: rwlock underflowed\n")
+
+	var writer = Zptr
+	if prev & Nrmask == 1 && prev & Wbit != 0
+		if rw._state == Wbit
+			rw._state = 0
+			if rw._head != Zptr
+				writer = rw._head
+				rw._head = rw._head.next
+				if rw._tail == writer
+					rw._tail = Zptr
+				;;
+			;;
+		;;
+	;;
+	mtxunlock(&rw._lock)
+
+	if writer != Zptr
+		sempost(&writer.sem)
+	;;
+}
+
+const rwwunlock = {rw
+	mtxlock(&rw._lock)
+	if rw._state & Wbit != 0
+		var head = Zptr
+		while (head = rw._head) != Zptr
+			rw._head = head.next
+			sempost(&head.sem)
+		;;
+		rw._tail = Zptr
+	;;
+	rw._state = 0
+	mtxunlock(&rw._lock)
+}
diff --git a/lib/thread/test/mutex.myr b/lib/thread/test/mutex.myr
index fd58df13..30624449 100644
--- a/lib/thread/test/mutex.myr
+++ b/lib/thread/test/mutex.myr
@@ -6,19 +6,16 @@ use thrtestutil
 const Nherd = 20
 
 var val : uint64 = 0
-var done : uint32 = 0
 var mtx : thread.mutex
+var done
 
 const main = {
-	done = 0
-	val = 0
-
 	mtx = thread.mkmtx()
+	done = thread.mkwg(Nherd)
+
 	thrtestutil.mkherd(Nherd, incvar)
-	while thread.xget(&done) != Nherd
-		/* nothing */
-	;;
-	if val != 1000 * 20
+	thread.wgwait(&done)
+	if val != 1000 * (Nherd : uint64)
 		std.fatal("mutexes are broken, got {}\n", val)
 	;;
 }
@@ -29,5 +26,5 @@ const incvar = {
 		val++
 		thread.mtxunlock(&mtx)
 	;;
-	thread.xadd(&done, 1)
+	thread.wgpost(&done)
 }
diff --git a/lib/thread/test/rwlock.myr b/lib/thread/test/rwlock.myr
new file mode 100644
index 00000000..94f69c24
--- /dev/null
+++ b/lib/thread/test/rwlock.myr
@@ -0,0 +1,50 @@
+use std
+use thread
+
+use thrtestutil
+
+const Nherd = 20
+const Nloops = 100_000
+
+var val : uint64
+var nreaders : uint32
+var nwriters : uint32
+var rw : thread.rwlock
+var done
+
+const main = {
+	rw = thread.mkrw()
+	done = thread.mkwg(Nherd)
+
+	thrtestutil.mkherd(Nherd, read)
+	thrtestutil.mkherd(Nherd, incvar)
+	thread.wgwait(&done)
+	if val != Nloops * (Nherd : uint64)
+		std.fatal("rwlocks are broken, got {}\n", val)
+	;;
+}
+
+const incvar = {
+	for var i = 0; i < Nloops; i++
+		thread.rwwlock(&rw)
+		thread.xadd(&nwriters, 1)
+		std.assert(thread.xget(&nreaders) == 0, "incvar: rwlocks are broken\n")
+		val++
+		thread.xadd(&nwriters, -1)
+		thread.rwwunlock(&rw)
+	;;
+	std.put("done\n")
+	thread.wgpost(&done)
+}
+
+const read = {
+	/* Linux seems to not want to end the process when there are still running threads. */
+	while thread.xget(&done._val) != 0
+		thread.rwrlock(&rw)
+		thread.xadd(&nreaders, 1)
+		std.assert(thread.xget(&nwriters) == 0, "read: rwlocks are broken\n")
+		thread.xadd(&nreaders, -1)
+		thread.rwrunlock(&rw)
+		std.usleep(1000)
+	;;
+}
-- 
2.18.0


Follow-Ups:
Re: [PATCH 3/4] Add rwlocks.Ori Bernstein <ori@xxxxxxxxxxxxxx>