[PATCH 02/17] powerpc/qspinlock: add mcs queueing for contended waiters

Nicholas Piggin npiggin at gmail.com
Thu Nov 10 20:21:13 AEDT 2022


On Thu Nov 10, 2022 at 10:36 AM AEST, Jordan Niethe wrote:
> On Thu, 2022-07-28 at 16:31 +1000, Nicholas Piggin wrote:
> <snip>
> [resend as utf-8, not utf-7]
> >  
> > +/*
> > + * Bitfields in the atomic value:
> > + *
> > + *     0: locked bit
> > + * 16-31: tail cpu (+1)
> > + */
> > +#define	_Q_SET_MASK(type)	(((1U << _Q_ ## type ## _BITS) - 1)\
> > +				      << _Q_ ## type ## _OFFSET)
> > +#define _Q_LOCKED_OFFSET	0
> > +#define _Q_LOCKED_BITS		1
> > +#define _Q_LOCKED_MASK		_Q_SET_MASK(LOCKED)
> > +#define _Q_LOCKED_VAL		(1U << _Q_LOCKED_OFFSET)
> > +
> > +#define _Q_TAIL_CPU_OFFSET	16
> > +#define _Q_TAIL_CPU_BITS	(32 - _Q_TAIL_CPU_OFFSET)
> > +#define _Q_TAIL_CPU_MASK	_Q_SET_MASK(TAIL_CPU)
> > +
>
> Just to state the obvious this is:
>
> #define _Q_LOCKED_OFFSET	0
> #define _Q_LOCKED_BITS		1
> #define _Q_LOCKED_MASK		0x00000001
> #define _Q_LOCKED_VAL		1
>
> #define _Q_TAIL_CPU_OFFSET	16
> #define _Q_TAIL_CPU_BITS	16
> #define _Q_TAIL_CPU_MASK	0xffff0000

Yeah. I'm wondering if that's a better style in the first place.
Generic qspinlock this can chance so there's slightly more reason to do
it that way.

> > +#if CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS)
> > +#error "qspinlock does not support such large CONFIG_NR_CPUS"
> > +#endif
> > +
> >  #endif /* _ASM_POWERPC_QSPINLOCK_TYPES_H */
> > diff --git a/arch/powerpc/lib/qspinlock.c b/arch/powerpc/lib/qspinlock.c
> > index 8dbce99a373c..5ebb88d95636 100644
> > --- a/arch/powerpc/lib/qspinlock.c
> > +++ b/arch/powerpc/lib/qspinlock.c
> > @@ -1,12 +1,172 @@
> >  // SPDX-License-Identifier: GPL-2.0-or-later
> > +#include <linux/atomic.h>
> > +#include <linux/bug.h>
> > +#include <linux/compiler.h>
> >  #include <linux/export.h>
> > -#include <linux/processor.h>
> > +#include <linux/percpu.h>
> > +#include <linux/smp.h>
> >  #include <asm/qspinlock.h>
> >  
> > -void queued_spin_lock_slowpath(struct qspinlock *lock)
> > +#define MAX_NODES	4
> > +
> > +struct qnode {
> > +	struct qnode	*next;
> > +	struct qspinlock *lock;
> > +	u8		locked; /* 1 if lock acquired */
> > +};
> > +
> > +struct qnodes {
> > +	int		count;
> > +	struct qnode nodes[MAX_NODES];
> > +};
>
> I think it could be worth commenting why qnodes::count instead _Q_TAIL_IDX_OFFSET.

I wasn't sure what you meant by this.

> > +
> > +static DEFINE_PER_CPU_ALIGNED(struct qnodes, qnodes);
> > +
> > +static inline int encode_tail_cpu(void)
>
> I think the generic version that takes smp_processor_id() as a parameter is clearer - at least with this function name.

Agree.

> > +{
> > +	return (smp_processor_id() + 1) << _Q_TAIL_CPU_OFFSET;
> > +}
> > +
> > +static inline int get_tail_cpu(int val)
>
> It seems like there should be a "decode" function to pair up with the "encode" function.

Agree.

> > +{
> > +	return (val >> _Q_TAIL_CPU_OFFSET) - 1;
> > +}
> > +
> > +/* Take the lock by setting the bit, no other CPUs may concurrently lock it. */
>
> Does that comment mean it is not necessary to use an atomic_or here?

No, only that it can't be locked. It can still be modified by another
queuer.

> > +static __always_inline void lock_set_locked(struct qspinlock *lock)
>
> nit: could just be called set_locked()

Yep.

> > +{
> > +	atomic_or(_Q_LOCKED_VAL, &lock->val);
> > +	__atomic_acquire_fence();
> > +}
> > +
> > +/* Take lock, clearing tail, cmpxchg with val (which must not be locked) */
> > +static __always_inline int trylock_clear_tail_cpu(struct qspinlock *lock, int val)
> > +{
> > +	int newval = _Q_LOCKED_VAL;
> > +
> > +	if (atomic_cmpxchg_acquire(&lock->val, val, newval) == val)
> > +		return 1;
> > +	else
> > +		return 0;
>
> same optional style nit: return (atomic_cmpxchg_acquire(&lock->val, val, newval) == val);

Am thinking about it :)

> > +}
> > +
> > +/*
> > + * Publish our tail, replacing previous tail. Return previous value.
> > + *
> > + * This provides a release barrier for publishing node, and an acquire barrier
> > + * for getting the old node.
> > + */
> > +static __always_inline int publish_tail_cpu(struct qspinlock *lock, int tail)
>
> Did you change from the xchg_tail() name in the generic version because of the release and acquire barriers this provides?
> Does "publish" generally imply the old value will be returned?

Yes publish I thought is a bit more obvious that's where it becomes
visible to other CPUs. It doesn't imply return, but I thought those
semantis are the self-documenting part.

>
> >  {
> > -	while (!queued_spin_trylock(lock))
> > +	for (;;) {
> > +		int val = atomic_read(&lock->val);
> > +		int newval = (val & ~_Q_TAIL_CPU_MASK) | tail;
> > +		int old;
> > +
> > +		old = atomic_cmpxchg(&lock->val, val, newval);
> > +		if (old == val)
> > +			return old;
> > +	}
> > +}
> > +
> > +static struct qnode *get_tail_qnode(struct qspinlock *lock, int val)
> > +{
> > +	int cpu = get_tail_cpu(val);
> > +	struct qnodes *qnodesp = per_cpu_ptr(&qnodes, cpu);
> > +	int idx;
> > +
> > +	for (idx = 0; idx < MAX_NODES; idx++) {
> > +		struct qnode *qnode = &qnodesp->nodes[idx];
> > +		if (qnode->lock == lock)
> > +			return qnode;
> > +	}
>
> In case anyone else is confused by this, Nick explained each cpu can only queue on a unique spinlock once regardless of "idx" level.
>
> > +
> > +	BUG();
> > +}
> > +
> > +static inline void queued_spin_lock_mcs_queue(struct qspinlock *lock)
> > +{
> > +	struct qnodes *qnodesp;
> > +	struct qnode *next, *node;
> > +	int val, old, tail;
> > +	int idx;
> > +
> > +	BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));
> > +
> > +	qnodesp = this_cpu_ptr(&qnodes);
> > +	if (unlikely(qnodesp->count == MAX_NODES)) {
>
> The comparison is >= in the generic, I guess we've no nested NMI so this is safe?

No... we could have nested NMI so this is wrong, good catch.

Thanks,
Nick


More information about the Linuxppc-dev mailing list