diff mbox series

[v3,05/14] locking/qspinlock: Remove unbounded cmpxchg loop from locking slowpath

Message ID 1524738868-31318-6-git-send-email-will.deacon@arm.com
State Accepted
Commit 59fb586b4a07b4e1a0ee577140ab4842ba451acd
Headers show
Series kernel/locking: qspinlock improvements | expand

Commit Message

Will Deacon April 26, 2018, 10:34 a.m. UTC
The qspinlock locking slowpath utilises a "pending" bit as a simple form
of an embedded test-and-set lock that can avoid the overhead of explicit
queuing in cases where the lock is held but uncontended. This bit is
managed using a cmpxchg loop which tries to transition the uncontended
lock word from (0,0,0) -> (0,0,1) or (0,0,1) -> (0,1,1).

Unfortunately, the cmpxchg loop is unbounded and lockers can be starved
indefinitely if the lock word is seen to oscillate between unlocked
(0,0,0) and locked (0,0,1). This could happen if concurrent lockers are
able to take the lock in the cmpxchg loop without queuing and pass it
around amongst themselves.

This patch fixes the problem by unconditionally setting _Q_PENDING_VAL
using atomic_fetch_or, and then inspecting the old value to see whether
we need to spin on the current lock owner, or whether we now effectively
hold the lock. The tricky scenario is when concurrent lockers end up
queuing on the lock and the lock becomes available, causing us to see
a lockword of (n,0,0). With pending now set, simply queuing could lead
to deadlock as the head of the queue may not have observed the pending
flag being cleared. Conversely, if the head of the queue did observe
pending being cleared, then it could transition the lock from (n,0,0) ->
(0,0,1) meaning that any attempt to "undo" our setting of the pending
bit could race with a concurrent locker trying to set it.

We handle this race by preserving the pending bit when taking the lock
after reaching the head of the queue and leaving the tail entry intact
if we saw pending set, because we know that the tail is going to be
updated shortly.

Cc: Peter Zijlstra <peterz@infradead.org>
Cc: Ingo Molnar <mingo@kernel.org>
Signed-off-by: Will Deacon <will.deacon@arm.com>

---
 kernel/locking/qspinlock.c          | 102 ++++++++++++++++++++----------------
 kernel/locking/qspinlock_paravirt.h |   5 --
 2 files changed, 58 insertions(+), 49 deletions(-)

-- 
2.1.4

Comments

Peter Zijlstra April 26, 2018, 3:53 p.m. UTC | #1
On Thu, Apr 26, 2018 at 11:34:19AM +0100, Will Deacon wrote:
> @@ -290,58 +312,50 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)

>  	}

>  

>  	/*

> +	 * If we observe any contention; queue.

> +	 */

> +	if (val & ~_Q_LOCKED_MASK)

> +		goto queue;

> +

> +	/*

>  	 * trylock || pending

>  	 *

>  	 * 0,0,0 -> 0,0,1 ; trylock

>  	 * 0,0,1 -> 0,1,1 ; pending

>  	 */

> +	val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val);

> +	if (!(val & ~_Q_LOCKED_MASK)) {

>  		/*

> +		 * we're pending, wait for the owner to go away.

> +		 *

> +		 * *,1,1 -> *,1,0


Tail must be 0 here, right?

> +		 *

> +		 * this wait loop must be a load-acquire such that we match the

> +		 * store-release that clears the locked bit and create lock

> +		 * sequentiality; this is because not all

> +		 * clear_pending_set_locked() implementations imply full

> +		 * barriers.

>  		 */

> +		if (val & _Q_LOCKED_MASK) {

> +			smp_cond_load_acquire(&lock->val.counter,

> +					      !(VAL & _Q_LOCKED_MASK));

> +		}

>  

>  		/*

> +		 * take ownership and clear the pending bit.

> +		 *

> +		 * *,1,0 -> *,0,1

>  		 */


Idem.

> +		clear_pending_set_locked(lock);

>  		return;

> +	}

>  

>  	/*

> +	 * If pending was clear but there are waiters in the queue, then

> +	 * we need to undo our setting of pending before we queue ourselves.

>  	 */

> +	if (!(val & _Q_PENDING_MASK))

> +		clear_pending(lock);


This is the branch for when we have !0 tail.

>  

>  	/*

>  	 * End of pending bit optimistic spinning and beginning of MCS


> @@ -445,15 +459,15 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)

>  	 * claim the lock:

>  	 *

>  	 * n,0,0 -> 0,0,1 : lock, uncontended

> +	 * *,*,0 -> *,*,1 : lock, contended

>  	 *

> +	 * If the queue head is the only one in the queue (lock value == tail)

> +	 * and nobody is pending, clear the tail code and grab the lock.

> +	 * Otherwise, we only need to grab the lock.

>  	 */

>  	for (;;) {

>  		/* In the PV case we might already have _Q_LOCKED_VAL set */

> +		if ((val & _Q_TAIL_MASK) != tail || (val & _Q_PENDING_MASK)) {

>  			set_locked(lock);

>  			break;

>  		}


This one hunk is terrible on the brain. I'm fairly sure I get it, but I
feel that comment can use help. Or at least, I need help reading it.

I'll try and cook up something when my brain starts working again.
Will Deacon April 26, 2018, 4:55 p.m. UTC | #2
Hi Peter,

On Thu, Apr 26, 2018 at 05:53:35PM +0200, Peter Zijlstra wrote:
> On Thu, Apr 26, 2018 at 11:34:19AM +0100, Will Deacon wrote:

> > @@ -290,58 +312,50 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)

> >  	}

> >  

> >  	/*

> > +	 * If we observe any contention; queue.

> > +	 */

> > +	if (val & ~_Q_LOCKED_MASK)

> > +		goto queue;

> > +

> > +	/*

> >  	 * trylock || pending

> >  	 *

> >  	 * 0,0,0 -> 0,0,1 ; trylock

> >  	 * 0,0,1 -> 0,1,1 ; pending

> >  	 */

> > +	val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val);

> > +	if (!(val & ~_Q_LOCKED_MASK)) {

> >  		/*

> > +		 * we're pending, wait for the owner to go away.

> > +		 *

> > +		 * *,1,1 -> *,1,0

> 

> Tail must be 0 here, right?


Not necessarily. If we're concurrently setting pending with another slowpath
locker, they could queue in the tail behind us, so we can't mess with those
upper bits.

> > +		 *

> > +		 * this wait loop must be a load-acquire such that we match the

> > +		 * store-release that clears the locked bit and create lock

> > +		 * sequentiality; this is because not all

> > +		 * clear_pending_set_locked() implementations imply full

> > +		 * barriers.

> >  		 */

> > +		if (val & _Q_LOCKED_MASK) {

> > +			smp_cond_load_acquire(&lock->val.counter,

> > +					      !(VAL & _Q_LOCKED_MASK));

> > +		}

> >  

> >  		/*

> > +		 * take ownership and clear the pending bit.

> > +		 *

> > +		 * *,1,0 -> *,0,1

> >  		 */

> 

> Idem.


Same here, hence why clear_pending_set_locked is either a 16-bit store or an
RmW (we can't just clobber the tail with 0).

> > +		clear_pending_set_locked(lock);

> >  		return;

> > +	}

> >  

> >  	/*

> > +	 * If pending was clear but there are waiters in the queue, then

> > +	 * we need to undo our setting of pending before we queue ourselves.

> >  	 */

> > +	if (!(val & _Q_PENDING_MASK))

> > +		clear_pending(lock);

> 

> This is the branch for when we have !0 tail.


That's the case where "val" has a !0 tail, but I think the comments are
trying to talk about the status of the lockword in memory, no?

> >  	/*

> >  	 * End of pending bit optimistic spinning and beginning of MCS

> 

> > @@ -445,15 +459,15 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)

> >  	 * claim the lock:

> >  	 *

> >  	 * n,0,0 -> 0,0,1 : lock, uncontended

> > +	 * *,*,0 -> *,*,1 : lock, contended

> >  	 *

> > +	 * If the queue head is the only one in the queue (lock value == tail)

> > +	 * and nobody is pending, clear the tail code and grab the lock.

> > +	 * Otherwise, we only need to grab the lock.

> >  	 */

> >  	for (;;) {

> >  		/* In the PV case we might already have _Q_LOCKED_VAL set */

> > +		if ((val & _Q_TAIL_MASK) != tail || (val & _Q_PENDING_MASK)) {

> >  			set_locked(lock);

> >  			break;

> >  		}

> 

> This one hunk is terrible on the brain. I'm fairly sure I get it, but I

> feel that comment can use help. Or at least, I need help reading it.

> 

> I'll try and cook up something when my brain starts working again.


Cheers. I think the code is a bit easier to read if you look at it after the
whole series is applied, but the comments could probably still be improved.

Will
Waiman Long April 26, 2018, 8:16 p.m. UTC | #3
On 04/26/2018 06:34 AM, Will Deacon wrote:
> The qspinlock locking slowpath utilises a "pending" bit as a simple form

> of an embedded test-and-set lock that can avoid the overhead of explicit

> queuing in cases where the lock is held but uncontended. This bit is

> managed using a cmpxchg loop which tries to transition the uncontended

> lock word from (0,0,0) -> (0,0,1) or (0,0,1) -> (0,1,1).

>

> Unfortunately, the cmpxchg loop is unbounded and lockers can be starved

> indefinitely if the lock word is seen to oscillate between unlocked

> (0,0,0) and locked (0,0,1). This could happen if concurrent lockers are

> able to take the lock in the cmpxchg loop without queuing and pass it

> around amongst themselves.

>

> This patch fixes the problem by unconditionally setting _Q_PENDING_VAL

> using atomic_fetch_or, and then inspecting the old value to see whether

> we need to spin on the current lock owner, or whether we now effectively

> hold the lock. The tricky scenario is when concurrent lockers end up

> queuing on the lock and the lock becomes available, causing us to see

> a lockword of (n,0,0). With pending now set, simply queuing could lead

> to deadlock as the head of the queue may not have observed the pending

> flag being cleared. Conversely, if the head of the queue did observe

> pending being cleared, then it could transition the lock from (n,0,0) ->

> (0,0,1) meaning that any attempt to "undo" our setting of the pending

> bit could race with a concurrent locker trying to set it.

>

> We handle this race by preserving the pending bit when taking the lock

> after reaching the head of the queue and leaving the tail entry intact

> if we saw pending set, because we know that the tail is going to be

> updated shortly.

>

> Cc: Peter Zijlstra <peterz@infradead.org>

> Cc: Ingo Molnar <mingo@kernel.org>

> Signed-off-by: Will Deacon <will.deacon@arm.com>

> ---

>  kernel/locking/qspinlock.c          | 102 ++++++++++++++++++++----------------

>  kernel/locking/qspinlock_paravirt.h |   5 --

>  2 files changed, 58 insertions(+), 49 deletions(-)

>

> diff --git a/kernel/locking/qspinlock.c b/kernel/locking/qspinlock.c

> index a0f7976348f8..ad94b7def005 100644

> --- a/kernel/locking/qspinlock.c

> +++ b/kernel/locking/qspinlock.c

> @@ -128,6 +128,17 @@ static inline __pure struct mcs_spinlock *decode_tail(u32 tail)

>  

>  #if _Q_PENDING_BITS == 8

>  /**

> + * clear_pending - clear the pending bit.

> + * @lock: Pointer to queued spinlock structure

> + *

> + * *,1,* -> *,0,*

> + */

> +static __always_inline void clear_pending(struct qspinlock *lock)

> +{

> +	WRITE_ONCE(lock->pending, 0);

> +}

> +

> +/**

>   * clear_pending_set_locked - take ownership and clear the pending bit.

>   * @lock: Pointer to queued spinlock structure

>   *

> @@ -163,6 +174,17 @@ static __always_inline u32 xchg_tail(struct qspinlock *lock, u32 tail)

>  #else /* _Q_PENDING_BITS == 8 */

>  

>  /**

> + * clear_pending - clear the pending bit.

> + * @lock: Pointer to queued spinlock structure

> + *

> + * *,1,* -> *,0,*

> + */

> +static __always_inline void clear_pending(struct qspinlock *lock)

> +{

> +	atomic_andnot(_Q_PENDING_VAL, &lock->val);

> +}

> +

> +/**

>   * clear_pending_set_locked - take ownership and clear the pending bit.

>   * @lock: Pointer to queued spinlock structure

>   *

> @@ -266,7 +288,7 @@ static __always_inline u32  __pv_wait_head_or_lock(struct qspinlock *lock,

>  void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)

>  {

>  	struct mcs_spinlock *prev, *next, *node;

> -	u32 new, old, tail;

> +	u32 old, tail;

>  	int idx;

>  

>  	BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));

> @@ -290,58 +312,50 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)

>  	}

>  

>  	/*

> +	 * If we observe any contention; queue.

> +	 */

> +	if (val & ~_Q_LOCKED_MASK)

> +		goto queue;

> +

> +	/*

>  	 * trylock || pending

>  	 *

>  	 * 0,0,0 -> 0,0,1 ; trylock

>  	 * 0,0,1 -> 0,1,1 ; pending

>  	 */

> -	for (;;) {

> +	val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val);

> +	if (!(val & ~_Q_LOCKED_MASK)) {

>  		/*

> -		 * If we observe any contention; queue.

> +		 * we're pending, wait for the owner to go away.

> +		 *

> +		 * *,1,1 -> *,1,0

> +		 *

> +		 * this wait loop must be a load-acquire such that we match the

> +		 * store-release that clears the locked bit and create lock

> +		 * sequentiality; this is because not all

> +		 * clear_pending_set_locked() implementations imply full

> +		 * barriers.

>  		 */

> -		if (val & ~_Q_LOCKED_MASK)

> -			goto queue;

> -

> -		new = _Q_LOCKED_VAL;

> -		if (val == new)

> -			new |= _Q_PENDING_VAL;

> +		if (val & _Q_LOCKED_MASK) {

> +			smp_cond_load_acquire(&lock->val.counter,

> +					      !(VAL & _Q_LOCKED_MASK));

> +		}

>  

>  		/*

> -		 * Acquire semantic is required here as the function may

> -		 * return immediately if the lock was free.

> +		 * take ownership and clear the pending bit.

> +		 *

> +		 * *,1,0 -> *,0,1

>  		 */

> -		old = atomic_cmpxchg_acquire(&lock->val, val, new);

> -		if (old == val)

> -			break;

> -

> -		val = old;

> -	}

> -

> -	/*

> -	 * we won the trylock

> -	 */

> -	if (new == _Q_LOCKED_VAL)

> +		clear_pending_set_locked(lock);

>  		return;

> +	}

>  

>  	/*

> -	 * we're pending, wait for the owner to go away.

> -	 *

> -	 * *,1,1 -> *,1,0

> -	 *

> -	 * this wait loop must be a load-acquire such that we match the

> -	 * store-release that clears the locked bit and create lock

> -	 * sequentiality; this is because not all clear_pending_set_locked()

> -	 * implementations imply full barriers.

> -	 */

> -	smp_cond_load_acquire(&lock->val.counter, !(VAL & _Q_LOCKED_MASK));

> -

> -	/*

> -	 * take ownership and clear the pending bit.

> -	 *

> -	 * *,1,0 -> *,0,1

> +	 * If pending was clear but there are waiters in the queue, then

> +	 * we need to undo our setting of pending before we queue ourselves.

>  	 */

> -	clear_pending_set_locked(lock);

> -	return;

> +	if (!(val & _Q_PENDING_MASK))

> +		clear_pending(lock);

>  

>  	/*

>  	 * End of pending bit optimistic spinning and beginning of MCS

> @@ -445,15 +459,15 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)

>  	 * claim the lock:

>  	 *

>  	 * n,0,0 -> 0,0,1 : lock, uncontended

> -	 * *,0,0 -> *,0,1 : lock, contended

> +	 * *,*,0 -> *,*,1 : lock, contended

>  	 *

> -	 * If the queue head is the only one in the queue (lock value == tail),

> -	 * clear the tail code and grab the lock. Otherwise, we only need

> -	 * to grab the lock.

> +	 * If the queue head is the only one in the queue (lock value == tail)

> +	 * and nobody is pending, clear the tail code and grab the lock.

> +	 * Otherwise, we only need to grab the lock.

>  	 */

>  	for (;;) {

>  		/* In the PV case we might already have _Q_LOCKED_VAL set */

> -		if ((val & _Q_TAIL_MASK) != tail) {

> +		if ((val & _Q_TAIL_MASK) != tail || (val & _Q_PENDING_MASK)) {

>  			set_locked(lock);

>  			break;

>  		}

> diff --git a/kernel/locking/qspinlock_paravirt.h b/kernel/locking/qspinlock_paravirt.h

> index 2711940429f5..2dbad2f25480 100644

> --- a/kernel/locking/qspinlock_paravirt.h

> +++ b/kernel/locking/qspinlock_paravirt.h

> @@ -118,11 +118,6 @@ static __always_inline void set_pending(struct qspinlock *lock)

>  	WRITE_ONCE(lock->pending, 1);

>  }

>  

> -static __always_inline void clear_pending(struct qspinlock *lock)

> -{

> -	WRITE_ONCE(lock->pending, 0);

> -}

> -

>  /*

>   * The pending bit check in pv_queued_spin_steal_lock() isn't a memory

>   * barrier. Therefore, an atomic cmpxchg_acquire() is used to acquire the


There is another clear_pending() function after the "#else /*
_Q_PENDING_BITS == 8 */" line that need to be removed as well.

-Longman
Will Deacon April 27, 2018, 10:16 a.m. UTC | #4
Hi Waiman,

On Thu, Apr 26, 2018 at 04:16:30PM -0400, Waiman Long wrote:
> On 04/26/2018 06:34 AM, Will Deacon wrote:

> > diff --git a/kernel/locking/qspinlock_paravirt.h b/kernel/locking/qspinlock_paravirt.h

> > index 2711940429f5..2dbad2f25480 100644

> > --- a/kernel/locking/qspinlock_paravirt.h

> > +++ b/kernel/locking/qspinlock_paravirt.h

> > @@ -118,11 +118,6 @@ static __always_inline void set_pending(struct qspinlock *lock)

> >  	WRITE_ONCE(lock->pending, 1);

> >  }

> >  

> > -static __always_inline void clear_pending(struct qspinlock *lock)

> > -{

> > -	WRITE_ONCE(lock->pending, 0);

> > -}

> > -

> >  /*

> >   * The pending bit check in pv_queued_spin_steal_lock() isn't a memory

> >   * barrier. Therefore, an atomic cmpxchg_acquire() is used to acquire the

> 

> There is another clear_pending() function after the "#else /*

> _Q_PENDING_BITS == 8 */" line that need to be removed as well.


Bugger, sorry I missed that one. Is the >= 16K CPUs case supported elsewhere
in Linux? The x86 Kconfig appears to clamp NR_CPUS to 8192 iiuc.

Anyway, additional patch below. Ingo -- please can you apply this on top?

Thanks,

Will

--->8

From ef6aa51e47047fe1a57dfdbe2f45caf63fa95be4 Mon Sep 17 00:00:00 2001
From: Will Deacon <will.deacon@arm.com>

Date: Fri, 27 Apr 2018 10:40:13 +0100
Subject: [PATCH] locking/qspinlock: Remove duplicate clear_pending function
 from PV code

The native clear_pending function is identical to the PV version, so the
latter can simply be removed. This fixes the build for systems with >=
16K CPUs using the PV lock implementation.

Cc: Ingo Molnar <mingo@kernel.org>
Cc: Peter Zijlstra <peterz@infradead.org>
Reported-by: Waiman Long <longman@redhat.com>
Signed-off-by: Will Deacon <will.deacon@arm.com>

---
 kernel/locking/qspinlock_paravirt.h | 5 -----
 1 file changed, 5 deletions(-)

diff --git a/kernel/locking/qspinlock_paravirt.h b/kernel/locking/qspinlock_paravirt.h
index 25730b2ac022..5a0cf5f9008c 100644
--- a/kernel/locking/qspinlock_paravirt.h
+++ b/kernel/locking/qspinlock_paravirt.h
@@ -130,11 +130,6 @@ static __always_inline void set_pending(struct qspinlock *lock)
 	atomic_or(_Q_PENDING_VAL, &lock->val);
 }
 
-static __always_inline void clear_pending(struct qspinlock *lock)
-{
-	atomic_andnot(_Q_PENDING_VAL, &lock->val);
-}
-
 static __always_inline int trylock_clear_pending(struct qspinlock *lock)
 {
 	int val = atomic_read(&lock->val);
-- 
2.1.4
Waiman Long April 27, 2018, 1:09 p.m. UTC | #5
On 04/27/2018 06:16 AM, Will Deacon wrote:
> Hi Waiman,

>

> On Thu, Apr 26, 2018 at 04:16:30PM -0400, Waiman Long wrote:

>> On 04/26/2018 06:34 AM, Will Deacon wrote:

>>> diff --git a/kernel/locking/qspinlock_paravirt.h b/kernel/locking/qspinlock_paravirt.h

>>> index 2711940429f5..2dbad2f25480 100644

>>> --- a/kernel/locking/qspinlock_paravirt.h

>>> +++ b/kernel/locking/qspinlock_paravirt.h

>>> @@ -118,11 +118,6 @@ static __always_inline void set_pending(struct qspinlock *lock)

>>>  	WRITE_ONCE(lock->pending, 1);

>>>  }

>>>  

>>> -static __always_inline void clear_pending(struct qspinlock *lock)

>>> -{

>>> -	WRITE_ONCE(lock->pending, 0);

>>> -}

>>> -

>>>  /*

>>>   * The pending bit check in pv_queued_spin_steal_lock() isn't a memory

>>>   * barrier. Therefore, an atomic cmpxchg_acquire() is used to acquire the

>> There is another clear_pending() function after the "#else /*

>> _Q_PENDING_BITS == 8 */" line that need to be removed as well.

> Bugger, sorry I missed that one. Is the >= 16K CPUs case supported elsewhere

> in Linux? The x86 Kconfig appears to clamp NR_CPUS to 8192 iiuc.

>

> Anyway, additional patch below. Ingo -- please can you apply this on top?

>

I don't think we support >= 16k in any of the distros. However, this
will be a limit that we will reach eventually. That is why I said we can
wait.

Cheers,
Longman
Peter Zijlstra April 28, 2018, 12:45 p.m. UTC | #6
On Thu, Apr 26, 2018 at 05:55:19PM +0100, Will Deacon wrote:
> Hi Peter,

> 

> On Thu, Apr 26, 2018 at 05:53:35PM +0200, Peter Zijlstra wrote:

> > On Thu, Apr 26, 2018 at 11:34:19AM +0100, Will Deacon wrote:

> > > @@ -290,58 +312,50 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)

> > >  	}

> > >  

> > >  	/*

> > > +	 * If we observe any contention; queue.

> > > +	 */

> > > +	if (val & ~_Q_LOCKED_MASK)

> > > +		goto queue;

> > > +

> > > +	/*

> > >  	 * trylock || pending

> > >  	 *

> > >  	 * 0,0,0 -> 0,0,1 ; trylock

> > >  	 * 0,0,1 -> 0,1,1 ; pending

> > >  	 */

> > > +	val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val);

> > > +	if (!(val & ~_Q_LOCKED_MASK)) {

> > >  		/*

> > > +		 * we're pending, wait for the owner to go away.

> > > +		 *

> > > +		 * *,1,1 -> *,1,0

> > 

> > Tail must be 0 here, right?

> 

> Not necessarily. If we're concurrently setting pending with another slowpath

> locker, they could queue in the tail behind us, so we can't mess with those

> upper bits.


Could be my brain just entirely stopped working; but I read that as:

	!(val & ~0xFF) := !(val & 0xFFFFFF00)

which then pretty much mandates the top bits are empty, no?
Will Deacon April 30, 2018, 8:53 a.m. UTC | #7
On Sat, Apr 28, 2018 at 02:45:37PM +0200, Peter Zijlstra wrote:
> On Thu, Apr 26, 2018 at 05:55:19PM +0100, Will Deacon wrote:

> > On Thu, Apr 26, 2018 at 05:53:35PM +0200, Peter Zijlstra wrote:

> > > On Thu, Apr 26, 2018 at 11:34:19AM +0100, Will Deacon wrote:

> > > > @@ -290,58 +312,50 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)

> > > >  	}

> > > >  

> > > >  	/*

> > > > +	 * If we observe any contention; queue.

> > > > +	 */

> > > > +	if (val & ~_Q_LOCKED_MASK)

> > > > +		goto queue;

> > > > +

> > > > +	/*

> > > >  	 * trylock || pending

> > > >  	 *

> > > >  	 * 0,0,0 -> 0,0,1 ; trylock

> > > >  	 * 0,0,1 -> 0,1,1 ; pending

> > > >  	 */

> > > > +	val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val);

> > > > +	if (!(val & ~_Q_LOCKED_MASK)) {

> > > >  		/*

> > > > +		 * we're pending, wait for the owner to go away.

> > > > +		 *

> > > > +		 * *,1,1 -> *,1,0

> > > 

> > > Tail must be 0 here, right?

> > 

> > Not necessarily. If we're concurrently setting pending with another slowpath

> > locker, they could queue in the tail behind us, so we can't mess with those

> > upper bits.

> 

> Could be my brain just entirely stopped working; but I read that as:

> 

> 	!(val & ~0xFF) := !(val & 0xFFFFFF00)

> 

> which then pretty much mandates the top bits are empty, no?


Only if there isn't a concurrent locker. For example:


T0:
// fastpath fails to acquire the lock, returns val == _Q_LOCKED_VAL

if (val & ~_Q_LOCKED_MASK)
	goto queue;

// Fallthrough

T1:
// fastpath fails to acquire the lock, returns val == _Q_LOCKED_VAL

if (val & ~_Q_LOCKED_MASK)
	goto queue;

// Fallthrough

T0:
val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val);
// val == _Q_LOCKED_VAL

T1:
val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val);
// val == _Q_PENDING_VAL | _Q_LOCKED_VAL
// Queue into tail

T0:
// Spins for _Q_LOCKED_MASK to go to zero, but tail is *non-zero*


So it's really down to whether the state transitions in the comments refer
to the lockword in memory, or the local "val" variable. I think the former
is more instructive, because the whole thing is concurrent.

Will
diff mbox series

Patch

diff --git a/kernel/locking/qspinlock.c b/kernel/locking/qspinlock.c
index a0f7976348f8..ad94b7def005 100644
--- a/kernel/locking/qspinlock.c
+++ b/kernel/locking/qspinlock.c
@@ -128,6 +128,17 @@  static inline __pure struct mcs_spinlock *decode_tail(u32 tail)
 
 #if _Q_PENDING_BITS == 8
 /**
+ * clear_pending - clear the pending bit.
+ * @lock: Pointer to queued spinlock structure
+ *
+ * *,1,* -> *,0,*
+ */
+static __always_inline void clear_pending(struct qspinlock *lock)
+{
+	WRITE_ONCE(lock->pending, 0);
+}
+
+/**
  * clear_pending_set_locked - take ownership and clear the pending bit.
  * @lock: Pointer to queued spinlock structure
  *
@@ -163,6 +174,17 @@  static __always_inline u32 xchg_tail(struct qspinlock *lock, u32 tail)
 #else /* _Q_PENDING_BITS == 8 */
 
 /**
+ * clear_pending - clear the pending bit.
+ * @lock: Pointer to queued spinlock structure
+ *
+ * *,1,* -> *,0,*
+ */
+static __always_inline void clear_pending(struct qspinlock *lock)
+{
+	atomic_andnot(_Q_PENDING_VAL, &lock->val);
+}
+
+/**
  * clear_pending_set_locked - take ownership and clear the pending bit.
  * @lock: Pointer to queued spinlock structure
  *
@@ -266,7 +288,7 @@  static __always_inline u32  __pv_wait_head_or_lock(struct qspinlock *lock,
 void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
 {
 	struct mcs_spinlock *prev, *next, *node;
-	u32 new, old, tail;
+	u32 old, tail;
 	int idx;
 
 	BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));
@@ -290,58 +312,50 @@  void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
 	}
 
 	/*
+	 * If we observe any contention; queue.
+	 */
+	if (val & ~_Q_LOCKED_MASK)
+		goto queue;
+
+	/*
 	 * trylock || pending
 	 *
 	 * 0,0,0 -> 0,0,1 ; trylock
 	 * 0,0,1 -> 0,1,1 ; pending
 	 */
-	for (;;) {
+	val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val);
+	if (!(val & ~_Q_LOCKED_MASK)) {
 		/*
-		 * If we observe any contention; queue.
+		 * we're pending, wait for the owner to go away.
+		 *
+		 * *,1,1 -> *,1,0
+		 *
+		 * this wait loop must be a load-acquire such that we match the
+		 * store-release that clears the locked bit and create lock
+		 * sequentiality; this is because not all
+		 * clear_pending_set_locked() implementations imply full
+		 * barriers.
 		 */
-		if (val & ~_Q_LOCKED_MASK)
-			goto queue;
-
-		new = _Q_LOCKED_VAL;
-		if (val == new)
-			new |= _Q_PENDING_VAL;
+		if (val & _Q_LOCKED_MASK) {
+			smp_cond_load_acquire(&lock->val.counter,
+					      !(VAL & _Q_LOCKED_MASK));
+		}
 
 		/*
-		 * Acquire semantic is required here as the function may
-		 * return immediately if the lock was free.
+		 * take ownership and clear the pending bit.
+		 *
+		 * *,1,0 -> *,0,1
 		 */
-		old = atomic_cmpxchg_acquire(&lock->val, val, new);
-		if (old == val)
-			break;
-
-		val = old;
-	}
-
-	/*
-	 * we won the trylock
-	 */
-	if (new == _Q_LOCKED_VAL)
+		clear_pending_set_locked(lock);
 		return;
+	}
 
 	/*
-	 * we're pending, wait for the owner to go away.
-	 *
-	 * *,1,1 -> *,1,0
-	 *
-	 * this wait loop must be a load-acquire such that we match the
-	 * store-release that clears the locked bit and create lock
-	 * sequentiality; this is because not all clear_pending_set_locked()
-	 * implementations imply full barriers.
-	 */
-	smp_cond_load_acquire(&lock->val.counter, !(VAL & _Q_LOCKED_MASK));
-
-	/*
-	 * take ownership and clear the pending bit.
-	 *
-	 * *,1,0 -> *,0,1
+	 * If pending was clear but there are waiters in the queue, then
+	 * we need to undo our setting of pending before we queue ourselves.
 	 */
-	clear_pending_set_locked(lock);
-	return;
+	if (!(val & _Q_PENDING_MASK))
+		clear_pending(lock);
 
 	/*
 	 * End of pending bit optimistic spinning and beginning of MCS
@@ -445,15 +459,15 @@  void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
 	 * claim the lock:
 	 *
 	 * n,0,0 -> 0,0,1 : lock, uncontended
-	 * *,0,0 -> *,0,1 : lock, contended
+	 * *,*,0 -> *,*,1 : lock, contended
 	 *
-	 * If the queue head is the only one in the queue (lock value == tail),
-	 * clear the tail code and grab the lock. Otherwise, we only need
-	 * to grab the lock.
+	 * If the queue head is the only one in the queue (lock value == tail)
+	 * and nobody is pending, clear the tail code and grab the lock.
+	 * Otherwise, we only need to grab the lock.
 	 */
 	for (;;) {
 		/* In the PV case we might already have _Q_LOCKED_VAL set */
-		if ((val & _Q_TAIL_MASK) != tail) {
+		if ((val & _Q_TAIL_MASK) != tail || (val & _Q_PENDING_MASK)) {
 			set_locked(lock);
 			break;
 		}
diff --git a/kernel/locking/qspinlock_paravirt.h b/kernel/locking/qspinlock_paravirt.h
index 2711940429f5..2dbad2f25480 100644
--- a/kernel/locking/qspinlock_paravirt.h
+++ b/kernel/locking/qspinlock_paravirt.h
@@ -118,11 +118,6 @@  static __always_inline void set_pending(struct qspinlock *lock)
 	WRITE_ONCE(lock->pending, 1);
 }
 
-static __always_inline void clear_pending(struct qspinlock *lock)
-{
-	WRITE_ONCE(lock->pending, 0);
-}
-
 /*
  * The pending bit check in pv_queued_spin_steal_lock() isn't a memory
  * barrier. Therefore, an atomic cmpxchg_acquire() is used to acquire the