diff mbox

[RFC,5/6] implement per-cpu&per-domain state machine call_srcu()

Message ID 1331027858-7648-5-git-send-email-laijs@cn.fujitsu.com
State New
Headers show

Commit Message

Lai Jiangshan March 6, 2012, 9:57 a.m. UTC
Known issues:
	1) All call_srcu() compete at its own ->gp_lock.
	 = It is a fake problem, Update-site should be rare for a *specified*
	   SRCU domain.

	2) Since it is per-domain and update-site is rare, per-cpu is overkill,
	   and slow down the processing a little.
	 = The answer is unknow now. If <5callbacks/grace-perioad, I suggest
	   using single state-machine per-domain, otherwise per-cpu per-domain.

	3) The processing for per-cpu state machines are not full parallelism.
	 = It is not a problem when callbacks are rare.
	   It can be easy fixed, reduce the region of ->gp_lock in state machine,
	   I will add this code when need.

	4) If a reader is preempted/sleeping long, and there are several
	   cpu state machines active(have pending callbacks), These cpus may
	   do periodic check_zero, they all failed, it is waste.
	 = We can choose a leader of these cpus-state-machine to do the
	   periodic check_zero, instead of several cpus.
	   It does not need to be fixed now, will be fixed later.

Design:
o	The srcu callback is new thing, I hope it is completely preemptible,
	even sleepable. It does in this implemetation, I use work_struct
	to stand for every srcu callback.
	I notice some RCU callbacks also use work_struct for sleepable work.
	But this indirect way push complexity to the caller and these callbacks
	are not rcu_barrier()-awared.

o	state machine is light way, it is preemptible when checking.
	(if issue3 is fixed, it will be almost full-preemptible), but it does
	not sleep(avoid to occupy a thread when sleep).
	It is also a work_struct. So, there is no thread occupied
	by SRCU when the srcu is not actived(no callbacks).
	It processes 10 callbacks at most and return.(also return the thread)
	So the whole SRCU is efficient in resource usage.

o	callback is called in the same CPU when it is queued.
	state machine work_struct and the callback work_struct may be run
	without schedled in in a worker thread.

o	workqueue handle all hot-plug issues for us.
	So this implementation highly depends on workqueue. DRY.


Others:
	srcu_head is bigger, it is worth, it provides more ability and simplify
	the srcu code.

	srcu_barrier() is not tested.

	I think *_expedited can be removed, because mb()-based srcu are
	extremely faster than before.

The trip of a callback:
	1) call_srcu(), record(snap) the chck_seq of the srcu_domain.

	2) if callback->chck_seq < srcu_domain->zero_seq[0] &&
	      callback->chck_seq < srcu_domain->zero_seq[1]
	   diriver the callback.
	   if not, it stays, let the state machine do flip or do check_zero.

	3) the callback is called in the workqueue.

The process of check_zero:
	1) advance srcu_domain->chck_seq when needed
	   (optional, but I make it bigger than all callbacks)

	2) do check_zero

	3) commit the srcu_domain->chck_seq to ->zero_seq[idx] when success.

	=) so if a callback see callback->chck_seq < srcu_domain->zero_seq[idx],
	   it means it see the whole above step which means all readers start
	   before the callback queued(but increase on active counter of idx)
	   are all completed.


Signed-off-by: Lai Jiangshan <laijs@cn.fujitsu.com>
---
 include/linux/srcu.h |   43 ++++++-
 kernel/srcu.c        |  400 ++++++++++++++++++++++++++++++++++++++++++--------
 2 files changed, 379 insertions(+), 64 deletions(-)

Comments

Peter Zijlstra March 6, 2012, 10:47 a.m. UTC | #1
On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
> o       The srcu callback is new thing, I hope it is completely preemptible,
>         even sleepable. It does in this implemetation, I use work_struct
>         to stand for every srcu callback. 

I didn't need the callbacks to sleep too, I just needed the read-side
srcu bit.

There's an argument against making the callbacks able to sleep like that
in that you typically want to minimize the amount of work done in the
callbacks, allowing them to sleep invites to callbacks that do _way_ too
much work.

I haven't made my mind up if I care yet.. :-)
Peter Zijlstra March 6, 2012, 10:58 a.m. UTC | #2
On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
>  /*
> + * 'return left < right;' but handle the overflow issues.
> + * The same as 'return (long)(right - left) > 0;' but it cares more.

About what? And why? We do the (long)(a - b) thing all over the kernel,
why would you care more?

> + */
> +static inline
> +bool safe_less_than(unsigned long left, unsigned long right, unsigned long max)
> +{
> +       unsigned long a = right - left;
> +       unsigned long b = max - left;
> +
> +       return !!a && (a <= b);
> +}
Peter Zijlstra March 6, 2012, 11:16 a.m. UTC | #3
On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
>         srcu_head is bigger, it is worth, it provides more ability and simplify
>         the srcu code. 

Dubious claim.. memory footprint of various data structures is deemed
important. rcu_head is 16 bytes, srcu_head is 32 bytes. I think it would
be real nice not to have two different callback structures and not grow
them as large.
Peter Zijlstra March 6, 2012, 11:17 a.m. UTC | #4
On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
> +       /* the sequence of check zero */
> +       unsigned long chck_seq; 

So you're saving 1 character at the expense of readability, weird
trade-off.
Peter Zijlstra March 6, 2012, 11:22 a.m. UTC | #5
On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
> +       int idxb = sp->completed & 0X1UL;

I didn't even know you could write a hex literal with an upper-case X.

How weird :-)
Peter Zijlstra March 6, 2012, 11:35 a.m. UTC | #6
On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
> +/* Must called with sp->flip_check_mutex and sp->gp_lock held; */
> +static bool check_zero_idx(struct srcu_struct *sp, int idx,
> +               struct srcu_head *head, int trycount)
> +{
> +       unsigned long chck_seq;
> +       bool checked_zero;
> + 

	lockdep_assert_held(&sp->flip_check_mutex);
	lockdep_assert_held(&sp->gp_lock);

and you can do away with your comment (also you forgot a verb).
Peter Zijlstra March 6, 2012, 11:36 a.m. UTC | #7
On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
> + * Must be called with sp->flip_check_mutex and sp->gp_lock held;
> + * Must be called from process contex, 

Holding a mutex pretty much guarantees you're in process context,
no? :-)
Peter Zijlstra March 6, 2012, 11:39 a.m. UTC | #8
On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
> +       int cpu = get_cpu();
> +       struct srcu_cpu_struct *scp = per_cpu_ptr(sp->srcu_per_cpu, cpu) 

They invented get_cpu_ptr() for that..
Peter Zijlstra March 6, 2012, 11:52 a.m. UTC | #9
On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
> +void srcu_barrier(struct srcu_struct *sp)
> +{
> +       struct srcu_sync sync;
> +       struct srcu_head *head = &sync.head;
> +       unsigned long chck_seq; /* snap */
> +
> +       int idle_loop = 0;
> +       int cpu;
> +       struct srcu_cpu_struct *scp;
> +
> +       spin_lock_irq(&sp->gp_lock);
> +       chck_seq = sp->chck_seq;
> +       for_each_possible_cpu(cpu) {

ARGH!! this is really not ok.. so we spend all this time killing
srcu_sync_expidited and co because they prod at all cpus for no good
reason, and what do you do?

Also, what happens if your cpu isn't actually online?


> +               scp = per_cpu_ptr(sp->srcu_per_cpu, cpu);
> +               if (scp->head && !safe_less_than(chck_seq, scp->head->chck_seq,
> +                               sp->chck_seq)) {
> +                       /* this path is likely enterred only once */
> +                       init_completion(&sync.completion);
> +                       srcu_queue_callback(sp, scp, head,
> +                                       __synchronize_srcu_callback);
> +                       /* don't need to wakeup the woken state machine */
> +                       spin_unlock_irq(&sp->gp_lock);
> +                       wait_for_completion(&sync.completion);
> +                       spin_lock_irq(&sp->gp_lock);
> +               } else {
> +                       if ((++idle_loop & 0xF) == 0) {
> +                               spin_unlock_irq(&sp->gp_lock);
> +                               udelay(1);
> +                               spin_lock_irq(&sp->gp_lock);
> +                       }

The purpose of this bit isn't quite clear to me, is this simply a lock
break?

> +               }
> +       }
> +       spin_unlock_irq(&sp->gp_lock);
> +
> +       flush_workqueue(srcu_callback_wq);

Since you already waited for the completions one by one, what's the
purpose of this?

> +}
> +EXPORT_SYMBOL_GPL(srcu_barrier);
Lai Jiangshan March 6, 2012, 2:44 p.m. UTC | #10
On Tue, Mar 6, 2012 at 7:52 PM, Peter Zijlstra <peterz@infradead.org> wrote:
> On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
>> +void srcu_barrier(struct srcu_struct *sp)
>> +{
>> +       struct srcu_sync sync;
>> +       struct srcu_head *head = &sync.head;
>> +       unsigned long chck_seq; /* snap */
>> +
>> +       int idle_loop = 0;
>> +       int cpu;
>> +       struct srcu_cpu_struct *scp;
>> +
>> +       spin_lock_irq(&sp->gp_lock);
>> +       chck_seq = sp->chck_seq;
>> +       for_each_possible_cpu(cpu) {
>
> ARGH!! this is really not ok.. so we spend all this time killing
> srcu_sync_expidited and co because they prod at all cpus for no good
> reason, and what do you do?

it is srcu_barrier(), it have to wait all callbacks complete for all
cpus since it is per-cpu
implementation.

>
> Also, what happens if your cpu isn't actually online?

The workqueue handles it, not here, if a cpu state machine has callbacks, the
state machine is started, if it has no callback,  srcu_barrier() does
nothing for
this cpu

>
>
>> +               scp = per_cpu_ptr(sp->srcu_per_cpu, cpu);
>> +               if (scp->head && !safe_less_than(chck_seq, scp->head->chck_seq,
>> +                               sp->chck_seq)) {
>> +                       /* this path is likely enterred only once */
>> +                       init_completion(&sync.completion);
>> +                       srcu_queue_callback(sp, scp, head,
>> +                                       __synchronize_srcu_callback);
>> +                       /* don't need to wakeup the woken state machine */
>> +                       spin_unlock_irq(&sp->gp_lock);
>> +                       wait_for_completion(&sync.completion);
>> +                       spin_lock_irq(&sp->gp_lock);
>> +               } else {
>> +                       if ((++idle_loop & 0xF) == 0) {
>> +                               spin_unlock_irq(&sp->gp_lock);
>> +                               udelay(1);
>> +                               spin_lock_irq(&sp->gp_lock);
>> +                       }
>
> The purpose of this bit isn't quite clear to me, is this simply a lock
> break?

Yes, the main purpose is:
make the time of sp->gp_lock short, can be determined.

>
>> +               }
>> +       }
>> +       spin_unlock_irq(&sp->gp_lock);
>> +
>> +       flush_workqueue(srcu_callback_wq);
>
> Since you already waited for the completions one by one, what's the
> purpose of this?
>
>> +}
>> +EXPORT_SYMBOL_GPL(srcu_barrier);
> --
> To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
> Please read the FAQ at  http://www.tux.org/lkml/
Lai Jiangshan March 6, 2012, 2:47 p.m. UTC | #11
>
>> +               }
>> +       }
>> +       spin_unlock_irq(&sp->gp_lock);
>> +
>> +       flush_workqueue(srcu_callback_wq);
>
> Since you already waited for the completions one by one, what's the
> purpose of this?

srcu callbacks are preemptible/sleepable here, so they may not
complete in order,
some may not finish here. so srcu_barrier() needs to wait them.


>
>> +}
>> +EXPORT_SYMBOL_GPL(srcu_barrier);
> --
> To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
> Please read the FAQ at  http://www.tux.org/lkml/
Lai Jiangshan March 6, 2012, 2:50 p.m. UTC | #12
On Tue, Mar 6, 2012 at 7:39 PM, Peter Zijlstra <peterz@infradead.org> wrote:
> On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
>> +       int cpu = get_cpu();
>> +       struct srcu_cpu_struct *scp = per_cpu_ptr(sp->srcu_per_cpu, cpu)
>
> They invented get_cpu_ptr() for that..

Thanks.
And I'm sorry, these code is merged by two functions,
I forgot it when merging, I need to check it more.

> --
> To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
> Please read the FAQ at  http://www.tux.org/lkml/
Lai Jiangshan March 6, 2012, 3:12 p.m. UTC | #13
On Tue, Mar 6, 2012 at 7:16 PM, Peter Zijlstra <peterz@infradead.org> wrote:
> On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
>>         srcu_head is bigger, it is worth, it provides more ability and simplify
>>         the srcu code.
>
> Dubious claim.. memory footprint of various data structures is deemed
> important. rcu_head is 16 bytes, srcu_head is 32 bytes. I think it would
> be real nice not to have two different callback structures and not grow
> them as large.

CC: tj@kernel.org
It could be better if workqueue also supports 2*sizeof(long) work callbacks.

I prefer ability/functionality a little more, it eases the caller's pain.
preemptible callbacks also eases the pressure of the whole system.
But I'm also ok if we limit the srcu-callbacks in softirq.

>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
> Please read the FAQ at  http://www.tux.org/lkml/
Lai Jiangshan March 6, 2012, 3:17 p.m. UTC | #14
On Tue, Mar 6, 2012 at 6:58 PM, Peter Zijlstra <peterz@infradead.org> wrote:
> On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
>>  /*
>> + * 'return left < right;' but handle the overflow issues.
>> + * The same as 'return (long)(right - left) > 0;' but it cares more.
>
> About what? And why? We do the (long)(a - b) thing all over the kernel,
> why would you care more?

@left is constants of  the callers(callbacks's snapshot), @right
increases very slow.
if (long)(right - left) is a big negative, we have to wait for a long
time in this kinds of overflow.
this kinds of overflow can not happen in this safe_less_than()

>
>> + */
>> +static inline
>> +bool safe_less_than(unsigned long left, unsigned long right, unsigned long max)
>> +{
>> +       unsigned long a = right - left;
>> +       unsigned long b = max - left;
>> +
>> +       return !!a && (a <= b);
>> +}
> --
> To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
> Please read the FAQ at  http://www.tux.org/lkml/
Lai Jiangshan March 6, 2012, 3:26 p.m. UTC | #15
On Tue, Mar 6, 2012 at 7:16 PM, Peter Zijlstra <peterz@infradead.org> wrote:
> On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
>>         srcu_head is bigger, it is worth, it provides more ability and simplify
>>         the srcu code.
>
> Dubious claim.. memory footprint of various data structures is deemed
> important. rcu_head is 16 bytes, srcu_head is 32 bytes. I think it would
> be real nice not to have two different callback structures and not grow
> them as large.

Also "Dubious" "simplify"?

bigger struct makes we can store ->check_seq in the callback, makes
the processing simply.

bigger struct is not required here, we can use a little complex way(batches)
to do it.

>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
> Please read the FAQ at  http://www.tux.org/lkml/
Peter Zijlstra March 6, 2012, 3:31 p.m. UTC | #16
On Tue, 2012-03-06 at 22:44 +0800, Lai Jiangshan wrote:
> >> +                       if ((++idle_loop & 0xF) == 0) {
> >> +                               spin_unlock_irq(&sp->gp_lock);
> >> +                               udelay(1);
> >> +                               spin_lock_irq(&sp->gp_lock);
> >> +                       }
> >
> > The purpose of this bit isn't quite clear to me, is this simply a lock
> > break?
> 
> Yes, the main purpose is:
> make the time of sp->gp_lock short, can be determined. 

either introduce cond_resched_lock_irq() or write something like:

	if (need_resched() || spin_needbreak(&sp->gp_lock)) {
		spin_unlock_irq(&sp->gp_lock);
		cond_resched();
		spin_lock_irq(&sp->gp_lock);
	}

udelay(1) is complete nonsense..
Peter Zijlstra March 6, 2012, 3:32 p.m. UTC | #17
On Tue, 2012-03-06 at 22:44 +0800, Lai Jiangshan wrote:
> it is srcu_barrier(), it have to wait all callbacks complete for all
> cpus since it is per-cpu
> implementation.
> 
If you enqueue each callback as a work, flush_workqueue() alone should
do that. It'll wait for completion of all currently enqueued works but
ignores new works.
Peter Zijlstra March 6, 2012, 3:34 p.m. UTC | #18
On Tue, 2012-03-06 at 23:12 +0800, Lai Jiangshan wrote:
> On Tue, Mar 6, 2012 at 7:16 PM, Peter Zijlstra <peterz@infradead.org> wrote:
> > On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
> >>         srcu_head is bigger, it is worth, it provides more ability and simplify
> >>         the srcu code.
> >
> > Dubious claim.. memory footprint of various data structures is deemed
> > important. rcu_head is 16 bytes, srcu_head is 32 bytes. I think it would
> > be real nice not to have two different callback structures and not grow
> > them as large.
> 
> CC: tj@kernel.org
> It could be better if workqueue also supports 2*sizeof(long) work callbacks.

That's going to be very painful if at all possible.

> I prefer ability/functionality a little more, it eases the caller's pain.
> preemptible callbacks also eases the pressure of the whole system.
> But I'm also ok if we limit the srcu-callbacks in softirq.

You don't have to use softirq, you could run a complete list from a
single worklet. Just keep the single linked rcu_head list and enqueue a
static (per-cpu) worker to process the entire list.
Peter Zijlstra March 6, 2012, 3:37 p.m. UTC | #19
On Tue, 2012-03-06 at 23:26 +0800, Lai Jiangshan wrote:
> On Tue, Mar 6, 2012 at 7:16 PM, Peter Zijlstra <peterz@infradead.org> wrote:
> > On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
> >>         srcu_head is bigger, it is worth, it provides more ability and simplify
> >>         the srcu code.
> >
> > Dubious claim.. memory footprint of various data structures is deemed
> > important. rcu_head is 16 bytes, srcu_head is 32 bytes. I think it would
> > be real nice not to have two different callback structures and not grow
> > them as large.
> 
> Also "Dubious" "simplify"?

Nah, I meant dubious. 

> bigger struct makes we can store ->check_seq in the callback, makes
> the processing simply.
> 
> bigger struct is not required here, we can use a little complex way(batches)
> to do it.

Right, just keep a callback list for each state, and once you fall off
the end process it. No need to keep per callback state.
Peter Zijlstra March 6, 2012, 3:38 p.m. UTC | #20
On Tue, 2012-03-06 at 23:17 +0800, Lai Jiangshan wrote:
> On Tue, Mar 6, 2012 at 6:58 PM, Peter Zijlstra <peterz@infradead.org> wrote:
> > On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
> >>  /*
> >> + * 'return left < right;' but handle the overflow issues.
> >> + * The same as 'return (long)(right - left) > 0;' but it cares more.
> >
> > About what? And why? We do the (long)(a - b) thing all over the kernel,
> > why would you care more?
> 
> @left is constants of  the callers(callbacks's snapshot), @right
> increases very slow.
> if (long)(right - left) is a big negative, we have to wait for a long
> time in this kinds of overflow.
> this kinds of overflow can not happen in this safe_less_than()

I'm afraid I'm being particularly dense, but what?!
Lai Jiangshan March 7, 2012, 6:44 a.m. UTC | #21
On 03/06/2012 11:32 PM, Peter Zijlstra wrote:
> On Tue, 2012-03-06 at 22:44 +0800, Lai Jiangshan wrote:
>> it is srcu_barrier(), it have to wait all callbacks complete for all
>> cpus since it is per-cpu
>> implementation.
>>
> If you enqueue each callback as a work, flush_workqueue() alone should
> do that. It'll wait for completion of all currently enqueued works but
> ignores new works.

Callbacks are not queued in the workqueue when call_srcu().
They are delivered(queued in the workqueue) after a grace period,
so we need to wait for all are delivered before flush_workqueue().

> 
> 
> --
> To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
> Please read the FAQ at  http://www.tux.org/lkml/
>
Gilad Ben-Yossef March 7, 2012, 8:10 a.m. UTC | #22
On Tue, Mar 6, 2012 at 4:44 PM, Lai Jiangshan <eag0628@gmail.com> wrote:
> On Tue, Mar 6, 2012 at 7:52 PM, Peter Zijlstra <peterz@infradead.org> wrote:
>> On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
>>> +void srcu_barrier(struct srcu_struct *sp)
>>> +{
>>> +       struct srcu_sync sync;
>>> +       struct srcu_head *head = &sync.head;
>>> +       unsigned long chck_seq; /* snap */
>>> +
>>> +       int idle_loop = 0;
>>> +       int cpu;
>>> +       struct srcu_cpu_struct *scp;
>>> +
>>> +       spin_lock_irq(&sp->gp_lock);
>>> +       chck_seq = sp->chck_seq;
>>> +       for_each_possible_cpu(cpu) {
>>
>> ARGH!! this is really not ok.. so we spend all this time killing
>> srcu_sync_expidited and co because they prod at all cpus for no good
>> reason, and what do you do?
>
> it is srcu_barrier(), it have to wait all callbacks complete for all
> cpus since it is per-cpu
> implementation.

I would say it only needs to wait for callbacks to complete for all
CPUs that has a callback pending.

Unless I misunderstood something, that is what your code does already
- it does not wait for completion,
or schedules a work on a CPU that does not has a callback pending, right?

>
>>
>> Also, what happens if your cpu isn't actually online?
>
> The workqueue handles it, not here, if a cpu state machine has callbacks, the
> state machine is started, if it has no callback,  srcu_barrier() does
> nothing for
> this cpu

I understand the point is that offline cpus wont have callbacks, so
nothing would be
done for them, but still, is that a reason to even check? why not use
for_each_online_cpu

I think that if a cpu that was offline went online after your check
and managed to get an
SRCU callback pending it is by definition not a callback srcu_barrier
needs to wait for
since it went pending at a later time then srcu_barrier was called. Or
have I missed something?

Thanks,
Gilad
Lai Jiangshan March 7, 2012, 9:21 a.m. UTC | #23
On 03/07/2012 04:10 PM, Gilad Ben-Yossef wrote:
> On Tue, Mar 6, 2012 at 4:44 PM, Lai Jiangshan <eag0628@gmail.com> wrote:
>> On Tue, Mar 6, 2012 at 7:52 PM, Peter Zijlstra <peterz@infradead.org> wrote:
>>> On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
>>>> +void srcu_barrier(struct srcu_struct *sp)
>>>> +{
>>>> +       struct srcu_sync sync;
>>>> +       struct srcu_head *head = &sync.head;
>>>> +       unsigned long chck_seq; /* snap */
>>>> +
>>>> +       int idle_loop = 0;
>>>> +       int cpu;
>>>> +       struct srcu_cpu_struct *scp;
>>>> +
>>>> +       spin_lock_irq(&sp->gp_lock);
>>>> +       chck_seq = sp->chck_seq;
>>>> +       for_each_possible_cpu(cpu) {
>>>
>>> ARGH!! this is really not ok.. so we spend all this time killing
>>> srcu_sync_expidited and co because they prod at all cpus for no good
>>> reason, and what do you do?
>>
>> it is srcu_barrier(), it have to wait all callbacks complete for all
>> cpus since it is per-cpu
>> implementation.
> 
> I would say it only needs to wait for callbacks to complete for all
> CPUs that has a callback pending.

Right.
The code above flush_workqueue() wait until all of them are delivered.
flush_workqueue() wait until all of them are completely invoked.

> 
> Unless I misunderstood something, that is what your code does already
> - it does not wait for completion,
> or schedules a work on a CPU that does not has a callback pending, right?
> 
>>
>>>
>>> Also, what happens if your cpu isn't actually online?
>>
>> The workqueue handles it, not here, if a cpu state machine has callbacks, the
>> state machine is started, if it has no callback,  srcu_barrier() does
>> nothing for
>> this cpu
> 
> I understand the point is that offline cpus wont have callbacks, so
> nothing would be
> done for them, but still, is that a reason to even check? why not use
> for_each_online_cpu

It is possible that the offline cpus have callbacks during hot-plugging.

> 
> I think that if a cpu that was offline went online after your check
> and managed to get an
> SRCU callback pending it is by definition not a callback srcu_barrier
> needs to wait for
> since it went pending at a later time then srcu_barrier was called. Or
> have I missed something?
> 
> Thanks,
> Gilad
>
Paul E. McKenney March 8, 2012, 7:44 p.m. UTC | #24
On Tue, Mar 06, 2012 at 11:47:25AM +0100, Peter Zijlstra wrote:
> On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
> > o       The srcu callback is new thing, I hope it is completely preemptible,
> >         even sleepable. It does in this implemetation, I use work_struct
> >         to stand for every srcu callback. 
> 
> I didn't need the callbacks to sleep too, I just needed the read-side
> srcu bit.
> 
> There's an argument against making the callbacks able to sleep like that
> in that you typically want to minimize the amount of work done in the
> callbacks, allowing them to sleep invites to callbacks that do _way_ too
> much work.
> 
> I haven't made my mind up if I care yet.. :-)

I prefer that they don't sleep.  Easy to push anything that needs to
sleep off to a work queue.  And allowing sleeping in an SRCU callback
function sounds like something that could cause serious problems down
the road.
Paul E. McKenney March 8, 2012, 7:49 p.m. UTC | #25
On Tue, Mar 06, 2012 at 04:38:18PM +0100, Peter Zijlstra wrote:
> On Tue, 2012-03-06 at 23:17 +0800, Lai Jiangshan wrote:
> > On Tue, Mar 6, 2012 at 6:58 PM, Peter Zijlstra <peterz@infradead.org> wrote:
> > > On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
> > >>  /*
> > >> + * 'return left < right;' but handle the overflow issues.
> > >> + * The same as 'return (long)(right - left) > 0;' but it cares more.
> > >
> > > About what? And why? We do the (long)(a - b) thing all over the kernel,
> > > why would you care more?
> > 
> > @left is constants of  the callers(callbacks's snapshot), @right
> > increases very slow.
> > if (long)(right - left) is a big negative, we have to wait for a long
> > time in this kinds of overflow.
> > this kinds of overflow can not happen in this safe_less_than()
> 
> I'm afraid I'm being particularly dense, but what?!

I have been converting the "(long)(a - b)" stuff in RCU to use unsigned
arithmetic.  The ULONG_CMP_GE() and friends in rcupdate.h are for this
purpose.

I too have used (long)(a - b) for a long time, but I saw with my own eyes
the glee in the compiler-writers' eyes when they discussed signed overflow
being undefined in the C standard.  I believe that the reasons for signed
overflow being undefined are long obsolete, but better safe than sorry.

							Thanx, Paul
Paul E. McKenney March 8, 2012, 7:58 p.m. UTC | #26
On Tue, Mar 06, 2012 at 04:34:53PM +0100, Peter Zijlstra wrote:
> On Tue, 2012-03-06 at 23:12 +0800, Lai Jiangshan wrote:
> > On Tue, Mar 6, 2012 at 7:16 PM, Peter Zijlstra <peterz@infradead.org> wrote:
> > > On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
> > >>         srcu_head is bigger, it is worth, it provides more ability and simplify
> > >>         the srcu code.
> > >
> > > Dubious claim.. memory footprint of various data structures is deemed
> > > important. rcu_head is 16 bytes, srcu_head is 32 bytes. I think it would
> > > be real nice not to have two different callback structures and not grow
> > > them as large.
> > 
> > CC: tj@kernel.org
> > It could be better if workqueue also supports 2*sizeof(long) work callbacks.
> 
> That's going to be very painful if at all possible.
> 
> > I prefer ability/functionality a little more, it eases the caller's pain.
> > preemptible callbacks also eases the pressure of the whole system.
> > But I'm also ok if we limit the srcu-callbacks in softirq.
> 
> You don't have to use softirq, you could run a complete list from a
> single worklet. Just keep the single linked rcu_head list and enqueue a
> static (per-cpu) worker to process the entire list.

I like the idea of SRCU using rcu_head.  I am a little concerned about
what happens when there are lots of SRCU callbacks, but am willing to
wait to solve those problems until the situation arises.

But I guess I should ask...  Peter, what do you expect the maximum
call_srcu() rate to be in your use cases?  If tens of thousands are
possible, some adjustments will be needed.

							Thanx, Paul
Lai Jiangshan March 10, 2012, 3:32 a.m. UTC | #27
On Fri, Mar 9, 2012 at 3:58 AM, Paul E. McKenney
<paulmck@linux.vnet.ibm.com> wrote:
> On Tue, Mar 06, 2012 at 04:34:53PM +0100, Peter Zijlstra wrote:
>> On Tue, 2012-03-06 at 23:12 +0800, Lai Jiangshan wrote:
>> > On Tue, Mar 6, 2012 at 7:16 PM, Peter Zijlstra <peterz@infradead.org> wrote:
>> > > On Tue, 2012-03-06 at 17:57 +0800, Lai Jiangshan wrote:
>> > >>         srcu_head is bigger, it is worth, it provides more ability and simplify
>> > >>         the srcu code.
>> > >
>> > > Dubious claim.. memory footprint of various data structures is deemed
>> > > important. rcu_head is 16 bytes, srcu_head is 32 bytes. I think it would
>> > > be real nice not to have two different callback structures and not grow
>> > > them as large.
>> >
>> > CC: tj@kernel.org
>> > It could be better if workqueue also supports 2*sizeof(long) work callbacks.
>>
>> That's going to be very painful if at all possible.
>>
>> > I prefer ability/functionality a little more, it eases the caller's pain.
>> > preemptible callbacks also eases the pressure of the whole system.
>> > But I'm also ok if we limit the srcu-callbacks in softirq.
>>
>> You don't have to use softirq, you could run a complete list from a
>> single worklet. Just keep the single linked rcu_head list and enqueue a
>> static (per-cpu) worker to process the entire list.
>
> I like the idea of SRCU using rcu_head.  I am a little concerned about
> what happens when there are lots of SRCU callbacks, but am willing to
> wait to solve those problems until the situation arises.
>
> But I guess I should ask...  Peter, what do you expect the maximum
> call_srcu() rate to be in your use cases?  If tens of thousands are
> possible, some adjustments will be needed.

Different from RCU(global domain), SRCU is separated
by domains, I think the rate is not high for a given domain,
but the real answer depends on the practice.

>
>                                                        Thanx, Paul
>
Peter Zijlstra March 10, 2012, 10:09 a.m. UTC | #28
On Thu, 2012-03-08 at 11:58 -0800, Paul E. McKenney wrote:
> 
> But I guess I should ask...  Peter, what do you expect the maximum
> call_srcu() rate to be in your use cases?  If tens of thousands are
> possible, some adjustments will be needed. 

The one call-site I currently have is linked to vma lifetimes, so yeah,
I guess that that can be lots.
Peter Zijlstra March 10, 2012, 10:12 a.m. UTC | #29
On Thu, 2012-03-08 at 11:49 -0800, Paul E. McKenney wrote:
> 
> I too have used (long)(a - b) for a long time, but I saw with my own eyes
> the glee in the compiler-writers' eyes when they discussed signed overflow
> being undefined in the C standard.  I believe that the reasons for signed
> overflow being undefined are long obsolete, but better safe than sorry. 

Thing is, if they break that the whole kernel comes falling down, I
really wouldn't worry about RCU at that point. But to each their
pet-paranoia I guess ;-)
Paul E. McKenney March 12, 2012, 5:52 p.m. UTC | #30
On Sat, Mar 10, 2012 at 11:12:29AM +0100, Peter Zijlstra wrote:
> On Thu, 2012-03-08 at 11:49 -0800, Paul E. McKenney wrote:
> > 
> > I too have used (long)(a - b) for a long time, but I saw with my own eyes
> > the glee in the compiler-writers' eyes when they discussed signed overflow
> > being undefined in the C standard.  I believe that the reasons for signed
> > overflow being undefined are long obsolete, but better safe than sorry. 
> 
> Thing is, if they break that the whole kernel comes falling down, I
> really wouldn't worry about RCU at that point. But to each their
> pet-paranoia I guess ;-)

But just because I am paranoid doesn't mean that no one is after me!  ;-)

I agree that the compiler guys would need to provide a chicken switch
due to the huge amount of code that relies on (long)(a - b) handling
overflow reasonably.  But avoiding signed integer overflow is pretty
straightforward.  For example, I use the following in RCU:

	#define UINT_CMP_GE(a, b)	(UINT_MAX / 2 >= (a) - (b))
	#define UINT_CMP_LT(a, b)	(UINT_MAX / 2 < (a) - (b))
	#define ULONG_CMP_GE(a, b)	(ULONG_MAX / 2 >= (a) - (b))
	#define ULONG_CMP_LT(a, b)	(ULONG_MAX / 2 < (a) - (b))

But yes, part of the reason for my doing this was to make conversations
with the usual standards-committee suspects go more smoothly.

							Thanx, Paul
Paul E. McKenney March 12, 2012, 5:54 p.m. UTC | #31
On Sat, Mar 10, 2012 at 11:09:53AM +0100, Peter Zijlstra wrote:
> On Thu, 2012-03-08 at 11:58 -0800, Paul E. McKenney wrote:
> > 
> > But I guess I should ask...  Peter, what do you expect the maximum
> > call_srcu() rate to be in your use cases?  If tens of thousands are
> > possible, some adjustments will be needed. 
> 
> The one call-site I currently have is linked to vma lifetimes, so yeah,
> I guess that that can be lots.

So the worst case would be if several processes with lots of VMAs were
to exit at about the same time?  If so, my guess is that call_srcu()
needs to handle several thousand callbacks showing up within a few
tens of microseconds.  Is that a reasonable assumption, or am I missing
an order of magnitude or two in either direction?

							Thanx, Paul
Peter Zijlstra March 12, 2012, 5:58 p.m. UTC | #32
On Mon, 2012-03-12 at 10:54 -0700, Paul E. McKenney wrote:
> On Sat, Mar 10, 2012 at 11:09:53AM +0100, Peter Zijlstra wrote:
> > On Thu, 2012-03-08 at 11:58 -0800, Paul E. McKenney wrote:
> > > 
> > > But I guess I should ask...  Peter, what do you expect the maximum
> > > call_srcu() rate to be in your use cases?  If tens of thousands are
> > > possible, some adjustments will be needed. 
> > 
> > The one call-site I currently have is linked to vma lifetimes, so yeah,
> > I guess that that can be lots.
> 
> So the worst case would be if several processes with lots of VMAs were
> to exit at about the same time?  If so, my guess is that call_srcu()
> needs to handle several thousand callbacks showing up within a few
> tens of microseconds.  Is that a reasonable assumption, or am I missing
> an order of magnitude or two in either direction?

That or a process is doing mmap/munmap loops (some file scanners are
known to do this). But yeah, that can be lots.

My current use case doesn't quite trigger since it needs another syscall
to attach something to a vma before vma tear-down actually ends up
calling call_srcu() so in practice it won't be much at all (for now).

Still I think its prudent to make it (srcu callbacks) deal with plenty
callbacks even if initially there won't be many -- who knows what other
people will do while you're not paying attention... :-)
Paul E. McKenney March 12, 2012, 6:32 p.m. UTC | #33
On Mon, Mar 12, 2012 at 06:58:17PM +0100, Peter Zijlstra wrote:
> On Mon, 2012-03-12 at 10:54 -0700, Paul E. McKenney wrote:
> > On Sat, Mar 10, 2012 at 11:09:53AM +0100, Peter Zijlstra wrote:
> > > On Thu, 2012-03-08 at 11:58 -0800, Paul E. McKenney wrote:
> > > > 
> > > > But I guess I should ask...  Peter, what do you expect the maximum
> > > > call_srcu() rate to be in your use cases?  If tens of thousands are
> > > > possible, some adjustments will be needed. 
> > > 
> > > The one call-site I currently have is linked to vma lifetimes, so yeah,
> > > I guess that that can be lots.
> > 
> > So the worst case would be if several processes with lots of VMAs were
> > to exit at about the same time?  If so, my guess is that call_srcu()
> > needs to handle several thousand callbacks showing up within a few
> > tens of microseconds.  Is that a reasonable assumption, or am I missing
> > an order of magnitude or two in either direction?
> 
> That or a process is doing mmap/munmap loops (some file scanners are
> known to do this). But yeah, that can be lots.
> 
> My current use case doesn't quite trigger since it needs another syscall
> to attach something to a vma before vma tear-down actually ends up
> calling call_srcu() so in practice it won't be much at all (for now).
> 
> Still I think its prudent to make it (srcu callbacks) deal with plenty
> callbacks even if initially there won't be many -- who knows what other
> people will do while you're not paying attention... :-)

And another question I should have asked to begin with...  Would each
VMA have its own SRCU domain, or are you thinking in terms of one
SRCU domain for all VMAs globally?

If the latter, that pushes pretty strongly for per-CPU SRCU callback
lists.  Which brings up srcu_barrier() scalability (and yes, I am working
on rcu_barrier() scalability).  One way to handle this at least initially
is to have srcu_barrier() avoid enqueueing callbacks on CPUs whose
callback lists are empty.  In addition, if the loop over all CPUs is
preemptible, then there should not be much in the way of realtime issues.

The memory-ordering issues should be OK -- if a given CPU's list was
empty at any time during srcu_barrier()'s execution, then there is no
need for a callback on that CPU.

							Thanx, Paul
Peter Zijlstra March 12, 2012, 8:25 p.m. UTC | #34
On Mon, 2012-03-12 at 11:32 -0700, Paul E. McKenney wrote:
> And another question I should have asked to begin with...  Would each
> VMA have its own SRCU domain, or are you thinking in terms of one
> SRCU domain for all VMAs globally?

The latter, single domain for all objects.

> If the latter, that pushes pretty strongly for per-CPU SRCU callback
> lists. 

Agreed. I was under the impression the proposed thing had this, but on
looking at it again it does not. Shouldn't be hard to add though.

>  Which brings up srcu_barrier() scalability (and yes, I am working
> on rcu_barrier() scalability).  One way to handle this at least initially
> is to have srcu_barrier() avoid enqueueing callbacks on CPUs whose
> callback lists are empty.  In addition, if the loop over all CPUs is
> preemptible, then there should not be much in the way of realtime issues.

Why do we have rcu_barrier() and how is it different from
synchronize_rcu()?
Paul E. McKenney March 12, 2012, 11:15 p.m. UTC | #35
On Mon, Mar 12, 2012 at 09:25:16PM +0100, Peter Zijlstra wrote:
> On Mon, 2012-03-12 at 11:32 -0700, Paul E. McKenney wrote:
> > And another question I should have asked to begin with...  Would each
> > VMA have its own SRCU domain, or are you thinking in terms of one
> > SRCU domain for all VMAs globally?
> 
> The latter, single domain for all objects.

OK.

> > If the latter, that pushes pretty strongly for per-CPU SRCU callback
> > lists. 
> 
> Agreed. I was under the impression the proposed thing had this, but on
> looking at it again it does not. Shouldn't be hard to add though.

Agreed, but see srcu_barrier()...

> >  Which brings up srcu_barrier() scalability (and yes, I am working
> > on rcu_barrier() scalability).  One way to handle this at least initially
> > is to have srcu_barrier() avoid enqueueing callbacks on CPUs whose
> > callback lists are empty.  In addition, if the loop over all CPUs is
> > preemptible, then there should not be much in the way of realtime issues.
> 
> Why do we have rcu_barrier() and how is it different from
> synchronize_rcu()?

We need rcu_barrier() in order to be able to safely unload modules that
use call_rcu().  If a module fails to invoke rcu_barrier() between its
last call_rcu() and its unloading, then its RCU callbacks can be fatally
disappointed to learn that their callback functions are no longer in
memory.  See http://lwn.net/Articles/202847/ for more info.

While synchronize_rcu() waits only for a grace period, rcu_barrier()
waits for all pre-existing RCU callbacks to be invoked.  There is also
an rcu_barrier_bh() and rcu_barrier_sched().

Of course, if all uses of call_srcu() were to be from the main kernel
(as opposed to from a module), then there would be no need for a
srcu_barrier().  But it seems quite likely that if a call_srcu() is
available, its use from a module won't be far behind -- especially
given that rcutorture is normally built as a kernel module.

							Thanx, Paul
Peter Zijlstra March 12, 2012, 11:18 p.m. UTC | #36
On Mon, 2012-03-12 at 16:15 -0700, Paul E. McKenney wrote:
> We need rcu_barrier() in order to be able to safely unload modules
> that
> use call_rcu(). 

Oh, right.. I always forget about modules.
Paul E. McKenney March 12, 2012, 11:38 p.m. UTC | #37
On Tue, Mar 13, 2012 at 12:18:39AM +0100, Peter Zijlstra wrote:
> On Mon, 2012-03-12 at 16:15 -0700, Paul E. McKenney wrote:
> > We need rcu_barrier() in order to be able to safely unload modules
> > that
> > use call_rcu(). 
> 
> Oh, right.. I always forget about modules. 

Lucky you!  ;-)

(Sorry, couldn't resist...)

							Thanx, Paul
diff mbox

Patch

diff --git a/include/linux/srcu.h b/include/linux/srcu.h
index df8f5f7..c375985 100644
--- a/include/linux/srcu.h
+++ b/include/linux/srcu.h
@@ -29,6 +29,22 @@ 
 
 #include <linux/mutex.h>
 #include <linux/rcupdate.h>
+#include <linux/workqueue.h>
+
+struct srcu_head;
+typedef void (*srcu_callback_func_t)(struct srcu_head *head);
+
+struct srcu_head {
+	union {
+		struct {
+			struct srcu_head *next;
+			/* the snap of sp->chck_seq when queued */
+			unsigned long chck_seq;
+			srcu_callback_func_t func;
+		};
+		struct work_struct work;
+	};
+};
 
 struct srcu_struct_array {
 	unsigned long c[2];
@@ -39,10 +55,32 @@  struct srcu_struct_array {
 #define SRCU_REF_MASK		(ULONG_MAX >> SRCU_USAGE_BITS)
 #define SRCU_USAGE_COUNT	(SRCU_REF_MASK + 1)
 
+struct srcu_cpu_struct;
+
 struct srcu_struct {
 	unsigned completed;
+
+	/* the sequence of check zero */
+	unsigned long chck_seq;
+	/* the snap of chck_seq when the last callback is queued */
+	unsigned long callback_chck_seq;
+
+	/* the sequence value of succeed check(chck_seq) */
+	unsigned long zero_seq[2];
+
+	/* protects the above completed and sequence values */
+	spinlock_t gp_lock;
+
+	/* protects all the fields here except callback_chck_seq */
+	struct mutex flip_check_mutex;
+	/*
+	 * Fileds of the intersection of gp_lock-protected fields and
+	 * flip_check_mutex-protected fields can only be modified with
+	 * the two lock are both held, can be read with one of lock held.
+	 */
+
 	struct srcu_struct_array __percpu *per_cpu_ref;
-	struct mutex mutex;
+	struct srcu_cpu_struct __percpu *srcu_per_cpu;
 	unsigned long snap[NR_CPUS];
 #ifdef CONFIG_DEBUG_LOCK_ALLOC
 	struct lockdep_map dep_map;
@@ -236,4 +274,7 @@  static inline void srcu_read_unlock_raw(struct srcu_struct *sp, int idx)
 	local_irq_restore(flags);
 }
 
+void call_srcu(struct srcu_struct *sp, struct srcu_head *head,
+		srcu_callback_func_t func);
+void srcu_barrier(struct srcu_struct *sp);
 #endif
diff --git a/kernel/srcu.c b/kernel/srcu.c
index d101ed5..8c71ae8 100644
--- a/kernel/srcu.c
+++ b/kernel/srcu.c
@@ -33,13 +33,63 @@ 
 #include <linux/smp.h>
 #include <linux/delay.h>
 #include <linux/srcu.h>
+#include <linux/completion.h>
+
+#define SRCU_CALLBACK_BATCH	10
+#define SRCU_INTERVAL		1
+
+/* protected by sp->gp_lock */
+struct srcu_cpu_struct {
+	/* callback queue for handling */
+	struct srcu_head *head, **tail;
+
+	/* the struct srcu_struct of this struct srcu_cpu_struct */
+	struct srcu_struct *sp;
+	struct delayed_work work;
+};
+
+/*
+ * State machine process for every CPU, it may run on wrong CPU
+ * during hotplugging or synchronize_srcu() scheldule it after
+ * migrated.
+ */
+static void process_srcu_cpu_struct(struct work_struct *work);
+
+static struct workqueue_struct *srcu_callback_wq;
 
 static int init_srcu_struct_fields(struct srcu_struct *sp)
 {
+	int cpu;
+	struct srcu_cpu_struct *scp;
+
+	mutex_init(&sp->flip_check_mutex);
+	spin_lock_init(&sp->gp_lock);
 	sp->completed = 0;
-	mutex_init(&sp->mutex);
+	sp->chck_seq = 0;
+	sp->callback_chck_seq = 0;
+	sp->zero_seq[0] = 0;
+	sp->zero_seq[1] = 0;
+
 	sp->per_cpu_ref = alloc_percpu(struct srcu_struct_array);
-	return sp->per_cpu_ref ? 0 : -ENOMEM;
+	if (!sp->per_cpu_ref)
+		return -ENOMEM;
+
+	sp->srcu_per_cpu = alloc_percpu(struct srcu_cpu_struct);
+	if (!sp->srcu_per_cpu) {
+		free_percpu(sp->per_cpu_ref);
+		return -ENOMEM;
+	}
+
+	for_each_possible_cpu(cpu) {
+		scp = per_cpu_ptr(sp->srcu_per_cpu, cpu);
+
+		scp->sp = sp;
+		scp->head = NULL;
+		scp->tail = &scp->head;
+		INIT_DELAYED_WORK(&scp->work, process_srcu_cpu_struct);
+	}
+
+	return 0;
 }
 
 #ifdef CONFIG_DEBUG_LOCK_ALLOC
@@ -208,6 +258,8 @@  void cleanup_srcu_struct(struct srcu_struct *sp)
 		return;
 	free_percpu(sp->per_cpu_ref);
 	sp->per_cpu_ref = NULL;
+	free_percpu(sp->srcu_per_cpu);
+	sp->srcu_per_cpu = NULL;
 }
 EXPORT_SYMBOL_GPL(cleanup_srcu_struct);
 
@@ -247,6 +299,19 @@  void __srcu_read_unlock(struct srcu_struct *sp, int idx)
 EXPORT_SYMBOL_GPL(__srcu_read_unlock);
 
 /*
+ * 'return left < right;' but handle the overflow issues.
+ * The same as 'return (long)(right - left) > 0;' but it cares more.
+ */
+static inline
+bool safe_less_than(unsigned long left, unsigned long right, unsigned long max)
+{
+	unsigned long a = right - left;
+	unsigned long b = max - left;
+
+	return !!a && (a <= b);
+}
+
+/*
  * We use an adaptive strategy for synchronize_srcu() and especially for
  * synchronize_srcu_expedited().  We spin for a fixed time period
  * (defined below) to allow SRCU readers to exit their read-side critical
@@ -254,11 +319,11 @@  EXPORT_SYMBOL_GPL(__srcu_read_unlock);
  * we repeatedly block for 1-millisecond time periods.  This approach
  * has done well in testing, so there is no need for a config parameter.
  */
-#define SYNCHRONIZE_SRCU_READER_DELAY	5
+#define SRCU_RETRY_CHECK_DELAY		5
 #define SYNCHRONIZE_SRCU_TRYCOUNT	2
 #define SYNCHRONIZE_SRCU_EXP_TRYCOUNT	12
 
-static void wait_idx(struct srcu_struct *sp, int idx, int trycount)
+static bool do_check_zero_idx(struct srcu_struct *sp, int idx, int trycount)
 {
 	/*
 	 * If a reader fetches the index before the ->completed increment,
@@ -271,19 +336,12 @@  static void wait_idx(struct srcu_struct *sp, int idx, int trycount)
 	 */
 	smp_mb(); /* D */
 
-	/*
-	 * SRCU read-side critical sections are normally short, so wait
-	 * a small amount of time before possibly blocking.
-	 */
-	if (!srcu_readers_active_idx_check(sp, idx)) {
-		udelay(SYNCHRONIZE_SRCU_READER_DELAY);
-		while (!srcu_readers_active_idx_check(sp, idx)) {
-			if (trycount > 0) {
-				trycount--;
-				udelay(SYNCHRONIZE_SRCU_READER_DELAY);
-			} else
-				schedule_timeout_interruptible(1);
-		}
+	for (;;) {
+		if (srcu_readers_active_idx_check(sp, idx))
+			break;
+		if (--trycount <= 0)
+			return false;
+		udelay(SRCU_RETRY_CHECK_DELAY);
 	}
 
 	/*
@@ -297,6 +355,8 @@  static void wait_idx(struct srcu_struct *sp, int idx, int trycount)
 	 * the next flipping.
 	 */
 	smp_mb(); /* E */
+
+	return true;
 }
 
 /*
@@ -308,65 +368,234 @@  static void srcu_flip(struct srcu_struct *sp)
 	ACCESS_ONCE(sp->completed)++;
 }
 
+/* Must called with sp->flip_check_mutex and sp->gp_lock held; */
+static bool check_zero_idx(struct srcu_struct *sp, int idx,
+		struct srcu_head *head, int trycount)
+{
+	unsigned long chck_seq;
+	bool checked_zero;
+
+	/* find out the check sequence number for this check */
+	if (sp->chck_seq == sp->callback_chck_seq ||
+	    sp->chck_seq == head->chck_seq)
+		sp->chck_seq++;
+	chck_seq = sp->chck_seq;
+
+	spin_unlock_irq(&sp->gp_lock);
+	checked_zero = do_check_zero_idx(sp, idx, trycount);
+	spin_lock_irq(&sp->gp_lock);
+
+	if (!checked_zero)
+		return false;
+
+	/* commit the succeed check */
+	sp->zero_seq[idx] = chck_seq;
+
+	return true;
+}
+
+/*
+ * Are the @head completed? will try do check zero when it is not.
+ *
+ * Must be called with sp->flip_check_mutex and sp->gp_lock held;
+ * Must be called from process contex, because the check may be long
+ * The sp->gp_lock may be released and regained.
+ */
+static bool complete_head_flip_check(struct srcu_struct *sp,
+		struct srcu_head *head, int trycount)
+{
+	int idxb = sp->completed & 0X1UL;
+	int idxa = 1 - idxb;
+	unsigned long h  = head->chck_seq;
+	unsigned long za = sp->zero_seq[idxa];
+	unsigned long zb = sp->zero_seq[idxb];
+	unsigned long s  = sp->chck_seq;
+
+	if (!safe_less_than(h, za, s)) {
+		if (!check_zero_idx(sp, idxa, head, trycount))
+			return false;
+	}
+
+	if (!safe_less_than(h, zb, s)) {
+		srcu_flip(sp);
+		trycount = trycount < 2 ? 2 : trycount;
+		return check_zero_idx(sp, idxb, head, trycount);
+	}
+
+	return true;
+}
+
+/*
+ * Are the @head completed?
+ *
+ * Must called with sp->gp_lock held;
+ * srcu_queue_callback() and check_zero_idx() ensure (s - z0) and (s - z1)
+ * less than (ULONG_MAX / sizof(struct srcu_head)). There is at least one
+ * callback queued for each seq in (z0, s) and (z1, s). The same for
+ * za, zb, s in complete_head_flip_check().
+ */
+static bool complete_head(struct srcu_struct *sp, struct srcu_head *head)
+{
+	unsigned long h  = head->chck_seq;
+	unsigned long z0 = sp->zero_seq[0];
+	unsigned long z1 = sp->zero_seq[1];
+	unsigned long s  = sp->chck_seq;
+
+	return safe_less_than(h, z0, s) && safe_less_than(h, z1, s);
+}
+
+static void process_srcu_cpu_struct(struct work_struct *work)
+{
+	int i;
+	int can_flip_check;
+	struct srcu_head *head;
+	struct srcu_cpu_struct *scp;
+	struct srcu_struct *sp;
+	work_func_t wfunc;
+
+	scp = container_of(work, struct srcu_cpu_struct, work.work);
+	sp = scp->sp;
+
+	can_flip_check = mutex_trylock(&sp->flip_check_mutex);
+	spin_lock_irq(&sp->gp_lock);
+
+	for (i = 0; i < SRCU_CALLBACK_BATCH; i++) {
+		head = scp->head;
+		if (!head)
+			break;
+
+		/* Test whether the head is completed or not. */
+		if (can_flip_check) {
+			if (!complete_head_flip_check(sp, head, 1))
+				break;
+		} else {
+			if (!complete_head(sp, head))
+				break;
+		}
+
+		/* dequeue the completed callback */
+		scp->head = head->next;
+		if (!scp->head)
+			scp->tail = &scp->head;
+
+		/* deliver the callback, will be invoked in workqueue */
+		BUILD_BUG_ON(offsetof(struct srcu_head, work) != 0);
+		wfunc = (work_func_t)head->func;
+		INIT_WORK(&head->work, wfunc);
+		queue_work(srcu_callback_wq, &head->work);
+	}
+	if (scp->head)
+		schedule_delayed_work(&scp->work, SRCU_INTERVAL);
+	spin_unlock_irq(&sp->gp_lock);
+	if (can_flip_check)
+		mutex_unlock(&sp->flip_check_mutex);
+}
+
+static
+void srcu_queue_callback(struct srcu_struct *sp, struct srcu_cpu_struct *scp,
+                struct srcu_head *head, srcu_callback_func_t func)
+{
+	head->next = NULL;
+	head->func = func;
+	head->chck_seq = sp->chck_seq;
+	sp->callback_chck_seq = sp->chck_seq;
+	*scp->tail = head;
+	scp->tail = &head->next;
+}
+
+void call_srcu(struct srcu_struct *sp, struct srcu_head *head,
+		srcu_callback_func_t func)
+{
+	unsigned long flags;
+	int cpu = get_cpu();
+	struct srcu_cpu_struct *scp = per_cpu_ptr(sp->srcu_per_cpu, cpu);
+
+	spin_lock_irqsave(&sp->gp_lock, flags);
+	srcu_queue_callback(sp, scp, head, func);
+	/* start state machine when this is the head */
+	if (scp->head == head)
+		schedule_delayed_work(&scp->work, 0);
+	spin_unlock_irqrestore(&sp->gp_lock, flags);
+	put_cpu();
+}
+EXPORT_SYMBOL_GPL(call_srcu);
+
+struct srcu_sync {
+	struct srcu_head head;
+	struct completion completion;
+};
+
+static void __synchronize_srcu_callback(struct srcu_head *head)
+{
+	struct srcu_sync *sync = container_of(head, struct srcu_sync, head);
+
+	complete(&sync->completion);
+}
+
 /*
  * Helper function for synchronize_srcu() and synchronize_srcu_expedited().
  */
-static void __synchronize_srcu(struct srcu_struct *sp, int trycount)
+static void __synchronize_srcu(struct srcu_struct *sp, int try_count)
 {
+	struct srcu_sync sync;
+	struct srcu_head *head = &sync.head;
+	struct srcu_head **orig_tail;
+	int cpu = raw_smp_processor_id();
+	struct srcu_cpu_struct *scp = per_cpu_ptr(sp->srcu_per_cpu, cpu);
+	bool started;
+
 	rcu_lockdep_assert(!lock_is_held(&sp->dep_map) &&
 			   !lock_is_held(&rcu_bh_lock_map) &&
 			   !lock_is_held(&rcu_lock_map) &&
 			   !lock_is_held(&rcu_sched_lock_map),
 			   "Illegal synchronize_srcu() in same-type SRCU (or RCU) read-side critical section");
 
-	mutex_lock(&sp->mutex);
+	init_completion(&sync.completion);
+	if (!mutex_trylock(&sp->flip_check_mutex)) {
+		call_srcu(sp, &sync.head, __synchronize_srcu_callback);
+		goto wait;
+	}
+
+	spin_lock_irq(&sp->gp_lock);
+	started = !!scp->head;
+	orig_tail = scp->tail;
+	srcu_queue_callback(sp, scp, head, __synchronize_srcu_callback);
+
+	/* fast path */
+	if (complete_head_flip_check(sp, head, try_count)) {
+		/*
+		 * dequeue @head, we hold flip_check_mutex, the previous
+		 * node stays or all prevous node are all dequeued.
+		 */
+		if (scp->head == head)
+			orig_tail = &scp->head;
+		*orig_tail = head->next;
+		if (*orig_tail == NULL)
+			scp->tail = orig_tail;
+
+		/*
+		 * start state machine if this is the head and some callback(s)
+		 * are queued when we do check_zero(they have not started it).
+		 */
+		if (!started && scp->head)
+			schedule_delayed_work(&scp->work, 0);
+
+		/* we done! */
+		spin_unlock_irq(&sp->gp_lock);
+		mutex_unlock(&sp->flip_check_mutex);
+		return;
+	}
 
-	/*
-	 * Suppose that during the previous grace period, a reader
-	 * picked up the old value of the index, but did not increment
-	 * its counter until after the previous instance of
-	 * __synchronize_srcu() did the counter summation and recheck.
-	 * That previous grace period was OK because the reader did
-	 * not start until after the grace period started, so the grace
-	 * period was not obligated to wait for that reader.
-	 *
-	 * However, the current SRCU grace period does have to wait for
-	 * that reader.  This is handled by invoking wait_idx() on the
-	 * non-active set of counters (hence sp->completed - 1).  Once
-	 * wait_idx() returns, we know that all readers that picked up
-	 * the old value of ->completed and that already incremented their
-	 * counter will have completed.
-	 *
-	 * But what about readers that picked up the old value of
-	 * ->completed, but -still- have not managed to increment their
-	 * counter?  We do not need to wait for those readers, because
-	 * they will have started their SRCU read-side critical section
-	 * after the current grace period starts.
-	 *
-	 * Because it is unlikely that readers will be preempted between
-	 * fetching ->completed and incrementing their counter, wait_idx()
-	 * will normally not need to wait.
-	 */
-	wait_idx(sp, (sp->completed - 1) & 0x1, trycount);
+	/* slow path */
 
-	/*
-	 * Now that wait_idx() has waited for the really old readers,
-	 *
-	 * Flip the readers' index by incrementing ->completed, then wait
-	 * until there are no more readers using the counters referenced by
-	 * the old index value.  (Recall that the index is the bottom bit
-	 * of ->completed.)
-	 *
-	 * Of course, it is possible that a reader might be delayed for the
-	 * full duration of flip_idx_and_wait() between fetching the
-	 * index and incrementing its counter.  This possibility is handled
-	 * by the next __synchronize_srcu() invoking wait_idx() for such
-	 * readers before starting a new grace period.
-	 */
-	srcu_flip(sp);
-	wait_idx(sp, (sp->completed - 1) & 0x1, trycount);
+	/* start state machine when this is the head */
+	if (!started)
+		schedule_delayed_work(&scp->work, 0);
+	spin_unlock_irq(&sp->gp_lock);
+	mutex_unlock(&sp->flip_check_mutex);
 
-	mutex_unlock(&sp->mutex);
+wait:
+	wait_for_completion(&sync.completion);
 }
 
 /**
@@ -410,6 +639,44 @@  void synchronize_srcu_expedited(struct srcu_struct *sp)
 }
 EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
 
+void srcu_barrier(struct srcu_struct *sp)
+{
+	struct srcu_sync sync;
+	struct srcu_head *head = &sync.head;
+	unsigned long chck_seq; /* snap */
+
+	int idle_loop = 0;
+	int cpu;
+	struct srcu_cpu_struct *scp;
+
+	spin_lock_irq(&sp->gp_lock);
+	chck_seq = sp->chck_seq;
+	for_each_possible_cpu(cpu) {
+		scp = per_cpu_ptr(sp->srcu_per_cpu, cpu);
+		if (scp->head && !safe_less_than(chck_seq, scp->head->chck_seq,
+				sp->chck_seq)) {
+			/* this path is likely enterred only once */
+			init_completion(&sync.completion);
+			srcu_queue_callback(sp, scp, head,
+					__synchronize_srcu_callback);
+			/* don't need to wakeup the woken state machine */
+			spin_unlock_irq(&sp->gp_lock);
+			wait_for_completion(&sync.completion);
+			spin_lock_irq(&sp->gp_lock);
+		} else {
+			if ((++idle_loop & 0xF) == 0) {
+				spin_unlock_irq(&sp->gp_lock);
+				udelay(1);
+				spin_lock_irq(&sp->gp_lock);
+			}
+		}
+	}
+	spin_unlock_irq(&sp->gp_lock);
+
+	flush_workqueue(srcu_callback_wq);
+}
+EXPORT_SYMBOL_GPL(srcu_barrier);
+
 /**
  * srcu_batches_completed - return batches completed.
  * @sp: srcu_struct on which to report batch completion.
@@ -423,3 +690,10 @@  long srcu_batches_completed(struct srcu_struct *sp)
 	return sp->completed;
 }
 EXPORT_SYMBOL_GPL(srcu_batches_completed);
+
+__init int srcu_init(void)
+{
+	srcu_callback_wq = alloc_workqueue("srcu", 0, 0);
+	return srcu_callback_wq ? 0 : -1;
+}
+subsys_initcall(srcu_init);