[RFC] rcu: Make rcu_barrier() less disruptive

Message ID 20120315164839.GA1657@linux.vnet.ibm.com
State New
Headers show

Commit Message

Paul E. McKenney March 15, 2012, 4:48 p.m.
The rcu_barrier() primitive interrupts each and every CPU, registering
a callback on every CPU.  Once all of these callbacks have been invoked,
rcu_barrier() knows that every callback that was registered before
the call to rcu_barrier() has also been invoked.

However, there is no point in registering a callback on a CPU that
currently has no callbacks, most especially if that CPU is in a
deep idle state.  This commit therefore makes rcu_barrier() avoid
interrupting CPUs that have no callbacks.  Doing this requires reworking
the handling of orphaned callbacks, otherwise callbacks could slip through
rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had
not yet interrupted to a CPU that rcu_barrier() had already interrupted.
This reworking was needed anyway to take a first step towards weaning
RCU from the CPU_DYING notifier's use of stop_cpu().

Signed-off-by: Paul E. McKenney <paul.mckenney@linaro.org>
Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>

Comments

Mathieu Desnoyers March 15, 2012, 5:45 p.m. | #1
* Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:
> The rcu_barrier() primitive interrupts each and every CPU, registering
> a callback on every CPU.  Once all of these callbacks have been invoked,
> rcu_barrier() knows that every callback that was registered before
> the call to rcu_barrier() has also been invoked.
> 
> However, there is no point in registering a callback on a CPU that
> currently has no callbacks, most especially if that CPU is in a
> deep idle state.  This commit therefore makes rcu_barrier() avoid
> interrupting CPUs that have no callbacks.  Doing this requires reworking
> the handling of orphaned callbacks, otherwise callbacks could slip through
> rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had
> not yet interrupted to a CPU that rcu_barrier() had already interrupted.
> This reworking was needed anyway to take a first step towards weaning
> RCU from the CPU_DYING notifier's use of stop_cpu().

Quoting Documentation/RCU/rcubarrier.txt:

"We instead need the rcu_barrier() primitive. This primitive is similar
to synchronize_rcu(), but instead of waiting solely for a grace
period to elapse, it also waits for all outstanding RCU callbacks to
complete. Pseudo-code using rcu_barrier() is as follows:"

The patch you propose seems like a good approach to make rcu_barrier
less disruptive, but everyone need to be aware that rcu_barrier() would
quit having the side-effect of doing the equivalent of
"synchronize_rcu()" from now on: within this new approach, in the case
where there are no pending callbacks, rcu_barrier() could, AFAIU, return
without waiting for the current grace period to complete.

Any use of rcu_barrier() that would assume that a synchronize_rcu() is
implicit with the rcu_barrier() execution would be a bug anyway, but
those might only show up after this patch is applied. I would therefore
recommend to audit all rcu_barrier() users to ensure none is expecting
rcu_barrier to act as a synchronize_rcu before pushing this change.

Thanks,

Mathieu

> 
> Signed-off-by: Paul E. McKenney <paul.mckenney@linaro.org>
> Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
> 
> diff --git a/kernel/rcutree.c b/kernel/rcutree.c
> index 403306b..8269656 100644
> --- a/kernel/rcutree.c
> +++ b/kernel/rcutree.c
> @@ -75,6 +75,8 @@ static struct lock_class_key rcu_node_class[NUM_RCU_LVLS];
>  	.gpnum = -300, \
>  	.completed = -300, \
>  	.onofflock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.onofflock), \
> +	.orphan_nxttail = &structname##_state.orphan_nxtlist, \
> +	.orphan_donetail = &structname##_state.orphan_donelist, \
>  	.fqslock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.fqslock), \
>  	.n_force_qs = 0, \
>  	.n_force_qs_ngp = 0, \
> @@ -145,6 +147,13 @@ static void invoke_rcu_callbacks(struct rcu_state *rsp, struct rcu_data *rdp);
>  unsigned long rcutorture_testseq;
>  unsigned long rcutorture_vernum;
>  
> +/* State information for rcu_barrier() and friends. */
> +
> +static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
> +static atomic_t rcu_barrier_cpu_count;
> +static DEFINE_MUTEX(rcu_barrier_mutex);
> +static DECLARE_WAIT_QUEUE_HEAD(rcu_barrier_wq);
> +
>  /*
>   * Return true if an RCU grace period is in progress.  The ACCESS_ONCE()s
>   * permit this function to be invoked without holding the root rcu_node
> @@ -1311,7 +1320,60 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
>  #ifdef CONFIG_HOTPLUG_CPU
>  
>  /*
> - * Move a dying CPU's RCU callbacks to online CPU's callback list.
> + * Adopt the RCU callbacks from the specified rcu_state structure's
> + * orphanage.  The caller must hold the ->onofflock.
> + */
> +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> +{
> +	int i;
> +	struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
> +
> +	/*
> +	 * If there is an rcu_barrier() operation in progress, then
> +	 * only the task doing that operation is permitted to adopt
> +	 * callbacks.  To do otherwise breaks rcu_barrier() and friends
> +	 * by causing them to fail to wait for the callbacks in the
> +	 * orphanage.
> +	 */
> +	if (rsp->rcu_barrier_in_progress &&
> +	    rsp->rcu_barrier_in_progress != current)
> +		return;
> +
> +	/* Do the accounting first. */
> +	rdp->qlen_lazy += rsp->qlen_lazy;
> +	rdp->qlen += rsp->qlen;
> +	rdp->n_cbs_adopted += rsp->qlen;
> +	rsp->qlen_lazy = 0;
> +	rsp->qlen = 0;
> +
> +	/*
> +	 * We do not need a memory barrier here because the only way we
> +	 * can get here if there is an rcu_barrier() in flight is if
> +	 * we are the task doing the rcu_barrier().
> +	 */
> +
> +	/* First adopt the ready-to-invoke callbacks. */
> +	if (rsp->orphan_donelist != NULL) {
> +		*rsp->orphan_donetail = *rdp->nxttail[RCU_DONE_TAIL];
> +		*rdp->nxttail[RCU_DONE_TAIL] = rsp->orphan_donelist;
> +		for (i = RCU_NEXT_SIZE - 1; i >= RCU_DONE_TAIL; i--)
> +			if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL])
> +				rdp->nxttail[i] = rsp->orphan_donetail;
> +		rsp->orphan_donelist = NULL;
> +		rsp->orphan_donetail = &rsp->orphan_donelist;
> +	}
> +
> +	/* And then adopt the callbacks that still need a grace period. */
> +	if (rsp->orphan_nxtlist != NULL) {
> +		*rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxtlist;
> +		rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxttail;
> +		rsp->orphan_nxtlist = NULL;
> +		rsp->orphan_nxttail = &rsp->orphan_nxtlist;
> +	}
> +}
> +
> +/*
> + * Move a dying CPU's RCU callbacks to the rcu_state structure's orphanage.
>   * Also record a quiescent state for this CPU for the current grace period.
>   * Synchronization and interrupt disabling are not required because
>   * this function executes in stop_machine() context.  Therefore, cleanup
> @@ -1325,64 +1387,67 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
>  static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
>  {
>  	int i;
> +	unsigned long flags;
>  	unsigned long mask;
> -	int receive_cpu = cpumask_any(cpu_online_mask);
> +	bool orphaned = 0;
>  	struct rcu_data *rdp = this_cpu_ptr(rsp->rda);
> -	struct rcu_data *receive_rdp = per_cpu_ptr(rsp->rda, receive_cpu);
>  	RCU_TRACE(struct rcu_node *rnp = rdp->mynode); /* For dying CPU. */
>  
> -	/* First, adjust the counts. */
> +	/* Move the callbacks to the orphanage under ->onofflock protection. */
> +	raw_spin_lock_irqsave(&rsp->onofflock, flags);
> +
> +	/* First adjust the counts. */
>  	if (rdp->nxtlist != NULL) {
> -		receive_rdp->qlen_lazy += rdp->qlen_lazy;
> -		receive_rdp->qlen += rdp->qlen;
> +		rsp->qlen_lazy += rdp->qlen_lazy;
> +		rsp->qlen += rdp->qlen;
> +		rdp->n_cbs_orphaned += rdp->qlen;
>  		rdp->qlen_lazy = 0;
>  		rdp->qlen = 0;
> +		orphaned = 1;
>  	}
>  
>  	/*
> -	 * Next, move ready-to-invoke callbacks to be invoked on some
> -	 * other CPU.  These will not be required to pass through another
> -	 * grace period:  They are done, regardless of CPU.
> +	 * Next, move those callbacks still needing a grace period to
> +	 * the orphanage, where some other CPU will pick them up.
> +	 * Some of the callbacks might have gone partway through a grace
> +	 * period, but that is too bad.  They get to start over because we
> +	 * cannot assume that grace periods are synchronized across CPUs.
> +	 * We don't bother updating the ->nxttail[] array yet, instead
> +	 * we just reset the whole thing later on.
>  	 */
> -	if (rdp->nxtlist != NULL &&
> -	    rdp->nxttail[RCU_DONE_TAIL] != &rdp->nxtlist) {
> -		struct rcu_head *oldhead;
> -		struct rcu_head **oldtail;
> -		struct rcu_head **newtail;
> -
> -		oldhead = rdp->nxtlist;
> -		oldtail = receive_rdp->nxttail[RCU_DONE_TAIL];
> -		rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL];
> -		*rdp->nxttail[RCU_DONE_TAIL] = *oldtail;
> -		*receive_rdp->nxttail[RCU_DONE_TAIL] = oldhead;
> -		newtail = rdp->nxttail[RCU_DONE_TAIL];
> -		for (i = RCU_DONE_TAIL; i < RCU_NEXT_SIZE; i++) {
> -			if (receive_rdp->nxttail[i] == oldtail)
> -				receive_rdp->nxttail[i] = newtail;
> -			if (rdp->nxttail[i] == newtail)
> -				rdp->nxttail[i] = &rdp->nxtlist;
> -		}
> +	if (*rdp->nxttail[RCU_DONE_TAIL] != NULL) {
> +		*rsp->orphan_nxttail = *rdp->nxttail[RCU_DONE_TAIL];
> +		rsp->orphan_nxttail = rdp->nxttail[RCU_NEXT_TAIL];
> +		*rdp->nxttail[RCU_DONE_TAIL] = NULL;
>  	}
>  
>  	/*
> -	 * Finally, put the rest of the callbacks at the end of the list.
> -	 * The ones that made it partway through get to start over:  We
> -	 * cannot assume that grace periods are synchronized across CPUs.
> -	 * (We could splice RCU_WAIT_TAIL into RCU_NEXT_READY_TAIL, but
> -	 * this does not seem compelling.  Not yet, anyway.)
> +	 * Then move the ready-to-invoke callbacks to the orphanage,
> +	 * where some other CPU will pick them up.  These will not be
> +	 * required to pass though another grace period: They are done.
>  	 */
>  	if (rdp->nxtlist != NULL) {
> -		*receive_rdp->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist;
> -		receive_rdp->nxttail[RCU_NEXT_TAIL] =
> -				rdp->nxttail[RCU_NEXT_TAIL];
> -		receive_rdp->n_cbs_adopted += rdp->qlen;
> -		rdp->n_cbs_orphaned += rdp->qlen;
> -
> -		rdp->nxtlist = NULL;
> -		for (i = 0; i < RCU_NEXT_SIZE; i++)
> -			rdp->nxttail[i] = &rdp->nxtlist;
> +		*rsp->orphan_donetail = rdp->nxtlist;
> +		rsp->orphan_donetail = rdp->nxttail[RCU_DONE_TAIL];
>  	}
>  
> +	/* Finally, initialize the rcu_data structure's list to empty.  */
> +	rdp->nxtlist = NULL;
> +	for (i = 0; i < RCU_NEXT_SIZE; i++)
> +		rdp->nxttail[i] = &rdp->nxtlist;
> +
> +	/*
> +	 * Wait up the rcu_barrier() task if there is one and if we
> +	 * actually sent anything to the orphanage.  Except that we
> +	 * must delay the wakeup until ->onofflock is released to
> +	 * avoid deadlock.
> +	 */
> +	if (!rsp->rcu_barrier_in_progress)
> +		orphaned = 0;
> +	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> +	if (orphaned)
> +		wake_up(&rcu_barrier_wq);
> +
>  	/*
>  	 * Record a quiescent state for the dying CPU.  This is safe
>  	 * only because we have already cleared out the callbacks.
> @@ -1415,11 +1480,14 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
>  	rcu_stop_cpu_kthread(cpu);
>  	rcu_node_kthread_setaffinity(rnp, -1);
>  
> -	/* Remove the dying CPU from the bitmasks in the rcu_node hierarchy. */
> +	/* Remove the dead CPU from the bitmasks in the rcu_node hierarchy. */
>  
>  	/* Exclude any attempts to start a new grace period. */
>  	raw_spin_lock_irqsave(&rsp->onofflock, flags);
>  
> +	/* Collect the dead CPU's callbacks. */
> +	rcu_adopt_orphan_cbs(rsp);
> +
>  	/* Remove the outgoing CPU from the masks in the rcu_node hierarchy. */
>  	mask = rdp->grpmask;	/* rnp->grplo is constant. */
>  	do {
> @@ -1456,6 +1524,10 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
>  
>  #else /* #ifdef CONFIG_HOTPLUG_CPU */
>  
> +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> +{
> +}
> +
>  static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
>  {
>  }
> @@ -1524,9 +1596,6 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
>  			    rcu_is_callbacks_kthread());
>  
>  	/* Update count, and requeue any remaining callbacks. */
> -	rdp->qlen_lazy -= count_lazy;
> -	rdp->qlen -= count;
> -	rdp->n_cbs_invoked += count;
>  	if (list != NULL) {
>  		*tail = rdp->nxtlist;
>  		rdp->nxtlist = list;
> @@ -1536,6 +1605,10 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
>  			else
>  				break;
>  	}
> +	smp_mb(); /* List handling before counting for rcu_barrier(). */
> +	rdp->qlen_lazy -= count_lazy;
> +	rdp->qlen -= count;
> +	rdp->n_cbs_invoked += count;
>  
>  	/* Reinstate batch limit if we have worked down the excess. */
>  	if (rdp->blimit == LONG_MAX && rdp->qlen <= qlowmark)
> @@ -1824,13 +1897,14 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
>  	rdp = this_cpu_ptr(rsp->rda);
>  
>  	/* Add the callback to our list. */
> -	*rdp->nxttail[RCU_NEXT_TAIL] = head;
> -	rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
>  	rdp->qlen++;
>  	if (lazy)
>  		rdp->qlen_lazy++;
>  	else
>  		rcu_idle_count_callbacks_posted();
> +	smp_mb();  /* Count before adding callback for rcu_barrier(). */
> +	*rdp->nxttail[RCU_NEXT_TAIL] = head;
> +	rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
>  
>  	if (__is_kfree_rcu_offset((unsigned long)func))
>  		trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func,
> @@ -2169,15 +2243,10 @@ static int rcu_cpu_has_callbacks(int cpu)
>  	       rcu_preempt_cpu_has_callbacks(cpu);
>  }
>  
> -static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
> -static atomic_t rcu_barrier_cpu_count;
> -static DEFINE_MUTEX(rcu_barrier_mutex);
> -static struct completion rcu_barrier_completion;
> -
>  static void rcu_barrier_callback(struct rcu_head *notused)
>  {
>  	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
> -		complete(&rcu_barrier_completion);
> +		wake_up(&rcu_barrier_wq);
>  }
>  
>  /*
> @@ -2203,27 +2272,121 @@ static void _rcu_barrier(struct rcu_state *rsp,
>  			 void (*call_rcu_func)(struct rcu_head *head,
>  					       void (*func)(struct rcu_head *head)))
>  {
> -	BUG_ON(in_interrupt());
> +	int cpu;
> +	unsigned long flags;
> +	struct rcu_data *rdp;
> +	struct rcu_head rh;
> +
> +	init_rcu_head_on_stack(&rh);
> +
>  	/* Take mutex to serialize concurrent rcu_barrier() requests. */
>  	mutex_lock(&rcu_barrier_mutex);
> -	init_completion(&rcu_barrier_completion);
> +
> +	smp_mb();  /* Prevent any prior operations from leaking in. */
> +
>  	/*
> -	 * Initialize rcu_barrier_cpu_count to 1, then invoke
> -	 * rcu_barrier_func() on each CPU, so that each CPU also has
> -	 * incremented rcu_barrier_cpu_count.  Only then is it safe to
> -	 * decrement rcu_barrier_cpu_count -- otherwise the first CPU
> -	 * might complete its grace period before all of the other CPUs
> -	 * did their increment, causing this function to return too
> -	 * early.  Note that on_each_cpu() disables irqs, which prevents
> -	 * any CPUs from coming online or going offline until each online
> -	 * CPU has queued its RCU-barrier callback.
> +	 * Initialize the count to one rather than to zero in order to
> +	 * avoid a too-soon return to zero in case of a short grace period
> +	 * (or preemption of this task).  Also flag this task as doing
> +	 * an rcu_barrier().  This will prevent anyone else from adopting
> +	 * orphaned callbacks, which could cause otherwise failure if a
> +	 * CPU went offline and quickly came back online.  To see this,
> +	 * consider the following sequence of events:
> +	 *
> +	 * 1.	We cause CPU 0 to post an rcu_barrier_callback() callback.
> +	 * 2.	CPU 1 goes offline, orphaning its callbacks.
> +	 * 3.	CPU 0 adopts CPU 1's orphaned callbacks.
> +	 * 4.	CPU 1 comes back online.
> +	 * 5.	We cause CPU 1 to post an rcu_barrier_callback() callback.
> +	 * 6.	Both rcu_barrier_callback() callbacks are invoked, awakening
> +	 *	us -- but before CPU 1's orphaned callbacks are invoked!!!
>  	 */
>  	atomic_set(&rcu_barrier_cpu_count, 1);
> -	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
> -	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
> -		complete(&rcu_barrier_completion);
> -	wait_for_completion(&rcu_barrier_completion);
> +	raw_spin_lock_irqsave(&rsp->onofflock, flags);
> +	rsp->rcu_barrier_in_progress = current;
> +	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> +
> +	/*
> +	 * Force every CPU with callbacks to register a new callback
> +	 * that will tell us when all the preceding callbacks have
> +	 * been invoked.  If an offline CPU has callbacks, wait for
> +	 * it to either come back online or to finish orphaning those
> +	 * callbacks.
> +	 */
> +	for_each_possible_cpu(cpu) {
> +		preempt_disable();
> +		rdp = per_cpu_ptr(rsp->rda, cpu);
> +		if (cpu_is_offline(cpu)) {
> +			preempt_enable();
> +			while (cpu_is_offline(cpu) &&
> +			       ACCESS_ONCE(rdp->qlen))
> +				schedule_timeout_interruptible(1);
> +		} else if (ACCESS_ONCE(rdp->qlen)) {
> +			smp_call_function_single(cpu, rcu_barrier_func,
> +						 (void *)call_rcu_func, 1);
> +			preempt_enable();
> +		}
> +	}
> +
> +	/*
> +	 * Force any ongoing CPU-hotplug operations to complete,
> +	 * so that any callbacks from the outgoing CPUs are now in
> +	 * the orphanage.
> +	 */
> +	cpu_maps_update_begin();
> +	cpu_maps_update_done();
> +
> +	/*
> +	 * Now that all online CPUs have rcu_barrier_callback() callbacks
> +	 * posted, we can adopt all of the orphaned callbacks and place
> +	 * an rcu_barrier_callback() callback after them.  When that is done,
> +	 * we are guaranteed to have an rcu_barrier_callback() callback
> +	 * following every callback that could possibly have been
> +	 * registered before _rcu_barrier() was called.
> +	 */
> +	raw_spin_lock_irqsave(&rsp->onofflock, flags);
> +	rcu_adopt_orphan_cbs(rsp);
> +	atomic_inc(&rcu_barrier_cpu_count);
> +	call_rcu_func(&rh, rcu_barrier_callback);
> +	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> +
> +	/*
> +	 * Now that we have an rcu_barrier_callback() callback on each
> +	 * CPU, and thus each counted, remove the initial count.
> +	 */
> +	atomic_dec(&rcu_barrier_cpu_count);
> +	smp_mb__after_atomic_dec();
> +
> +	/*
> +	 * Loop waiting for all rcu_barrier_callback() callbacks to be
> +	 * invoked.  Adopt any orphaned callbacks in the meantime, just
> +	 * in case one of the rcu_barrier_callback() callbacks is orphaned.
> +	 */
> +	while (atomic_read(&rcu_barrier_cpu_count) > 0) {
> +		wait_event(rcu_barrier_wq,
> +			   atomic_read(&rcu_barrier_cpu_count) == 0 ||
> +			   ACCESS_ONCE(rsp->qlen));
> +		if (ACCESS_ONCE(rsp->qlen)) {
> +			raw_spin_lock_irqsave(&rsp->onofflock, flags);
> +			rcu_adopt_orphan_cbs(rsp);
> +			raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> +		}
> +	}
> +
> +	/*
> +	 * Done, so let others adopt orphaned callbacks.  But avoid
> +	 * indefinite postponement of any additional orphans by adopting
> +	 * one more time.
> +	 */
> +	raw_spin_lock_irqsave(&rsp->onofflock, flags);
> +	rcu_adopt_orphan_cbs(rsp);
> +	rsp->rcu_barrier_in_progress = NULL;
> +	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> +
> +	/* Other rcu_barrier() invocations can now safely proceed. */
>  	mutex_unlock(&rcu_barrier_mutex);
> +
> +	destroy_rcu_head_on_stack(&rh);
>  }
>  
>  /**
> diff --git a/kernel/rcutree.h b/kernel/rcutree.h
> index 36ca28e..1e49c56 100644
> --- a/kernel/rcutree.h
> +++ b/kernel/rcutree.h
> @@ -371,6 +371,17 @@ struct rcu_state {
>  
>  	raw_spinlock_t onofflock;		/* exclude on/offline and */
>  						/*  starting new GP. */
> +	struct rcu_head *orphan_nxtlist;	/* Orphaned callbacks that */
> +						/*  need a grace period. */
> +	struct rcu_head **orphan_nxttail;	/* Tail of above. */
> +	struct rcu_head *orphan_donelist;	/* Orphaned callbacks that */
> +						/*  are ready to invoke. */
> +	struct rcu_head **orphan_donetail;	/* Tail of above. */
> +	long qlen_lazy;				/* Number of lazy callbacks. */
> +	long qlen;				/* Total number of callbacks. */
> +	struct task_struct *rcu_barrier_in_progress;
> +						/* Task doing rcu_barrier(), */
> +						/*  or NULL if no barrier. */
>  	raw_spinlock_t fqslock;			/* Only one task forcing */
>  						/*  quiescent states. */
>  	unsigned long jiffies_force_qs;		/* Time at which to invoke */
> diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
> index ed459ed..d4bc16d 100644
> --- a/kernel/rcutree_trace.c
> +++ b/kernel/rcutree_trace.c
> @@ -271,13 +271,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
>  
>  	gpnum = rsp->gpnum;
>  	seq_printf(m, "c=%lu g=%lu s=%d jfq=%ld j=%x "
> -		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n",
> +		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld/%ld\n",
>  		   rsp->completed, gpnum, rsp->fqs_state,
>  		   (long)(rsp->jiffies_force_qs - jiffies),
>  		   (int)(jiffies & 0xffff),
>  		   rsp->n_force_qs, rsp->n_force_qs_ngp,
>  		   rsp->n_force_qs - rsp->n_force_qs_ngp,
> -		   rsp->n_force_qs_lh);
> +		   rsp->n_force_qs_lh, rsp->qlen_lazy, rsp->qlen);
>  	for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) {
>  		if (rnp->level != level) {
>  			seq_puts(m, "\n");
>
Paul E. McKenney March 15, 2012, 6:21 p.m. | #2
On Thu, Mar 15, 2012 at 01:45:27PM -0400, Mathieu Desnoyers wrote:
> * Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:
> > The rcu_barrier() primitive interrupts each and every CPU, registering
> > a callback on every CPU.  Once all of these callbacks have been invoked,
> > rcu_barrier() knows that every callback that was registered before
> > the call to rcu_barrier() has also been invoked.
> > 
> > However, there is no point in registering a callback on a CPU that
> > currently has no callbacks, most especially if that CPU is in a
> > deep idle state.  This commit therefore makes rcu_barrier() avoid
> > interrupting CPUs that have no callbacks.  Doing this requires reworking
> > the handling of orphaned callbacks, otherwise callbacks could slip through
> > rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had
> > not yet interrupted to a CPU that rcu_barrier() had already interrupted.
> > This reworking was needed anyway to take a first step towards weaning
> > RCU from the CPU_DYING notifier's use of stop_cpu().
> 
> Quoting Documentation/RCU/rcubarrier.txt:
> 
> "We instead need the rcu_barrier() primitive. This primitive is similar
> to synchronize_rcu(), but instead of waiting solely for a grace
> period to elapse, it also waits for all outstanding RCU callbacks to
> complete. Pseudo-code using rcu_barrier() is as follows:"
> 
> The patch you propose seems like a good approach to make rcu_barrier
> less disruptive, but everyone need to be aware that rcu_barrier() would
> quit having the side-effect of doing the equivalent of
> "synchronize_rcu()" from now on: within this new approach, in the case
> where there are no pending callbacks, rcu_barrier() could, AFAIU, return
> without waiting for the current grace period to complete.
> 
> Any use of rcu_barrier() that would assume that a synchronize_rcu() is
> implicit with the rcu_barrier() execution would be a bug anyway, but
> those might only show up after this patch is applied. I would therefore
> recommend to audit all rcu_barrier() users to ensure none is expecting
> rcu_barrier to act as a synchronize_rcu before pushing this change.

Good catch!

I am going to chicken out and explicitly wait for a grace period if there
were no callbacks.  Having rcu_barrier() very rarely be a quick no-op does
sound like a standing invitation for subtle non-reproducible bugs.  ;-)

							Thanx, Paul

> Thanks,
> 
> Mathieu
> 
> > 
> > Signed-off-by: Paul E. McKenney <paul.mckenney@linaro.org>
> > Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
> > 
> > diff --git a/kernel/rcutree.c b/kernel/rcutree.c
> > index 403306b..8269656 100644
> > --- a/kernel/rcutree.c
> > +++ b/kernel/rcutree.c
> > @@ -75,6 +75,8 @@ static struct lock_class_key rcu_node_class[NUM_RCU_LVLS];
> >  	.gpnum = -300, \
> >  	.completed = -300, \
> >  	.onofflock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.onofflock), \
> > +	.orphan_nxttail = &structname##_state.orphan_nxtlist, \
> > +	.orphan_donetail = &structname##_state.orphan_donelist, \
> >  	.fqslock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.fqslock), \
> >  	.n_force_qs = 0, \
> >  	.n_force_qs_ngp = 0, \
> > @@ -145,6 +147,13 @@ static void invoke_rcu_callbacks(struct rcu_state *rsp, struct rcu_data *rdp);
> >  unsigned long rcutorture_testseq;
> >  unsigned long rcutorture_vernum;
> >  
> > +/* State information for rcu_barrier() and friends. */
> > +
> > +static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
> > +static atomic_t rcu_barrier_cpu_count;
> > +static DEFINE_MUTEX(rcu_barrier_mutex);
> > +static DECLARE_WAIT_QUEUE_HEAD(rcu_barrier_wq);
> > +
> >  /*
> >   * Return true if an RCU grace period is in progress.  The ACCESS_ONCE()s
> >   * permit this function to be invoked without holding the root rcu_node
> > @@ -1311,7 +1320,60 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
> >  #ifdef CONFIG_HOTPLUG_CPU
> >  
> >  /*
> > - * Move a dying CPU's RCU callbacks to online CPU's callback list.
> > + * Adopt the RCU callbacks from the specified rcu_state structure's
> > + * orphanage.  The caller must hold the ->onofflock.
> > + */
> > +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> > +{
> > +	int i;
> > +	struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
> > +
> > +	/*
> > +	 * If there is an rcu_barrier() operation in progress, then
> > +	 * only the task doing that operation is permitted to adopt
> > +	 * callbacks.  To do otherwise breaks rcu_barrier() and friends
> > +	 * by causing them to fail to wait for the callbacks in the
> > +	 * orphanage.
> > +	 */
> > +	if (rsp->rcu_barrier_in_progress &&
> > +	    rsp->rcu_barrier_in_progress != current)
> > +		return;
> > +
> > +	/* Do the accounting first. */
> > +	rdp->qlen_lazy += rsp->qlen_lazy;
> > +	rdp->qlen += rsp->qlen;
> > +	rdp->n_cbs_adopted += rsp->qlen;
> > +	rsp->qlen_lazy = 0;
> > +	rsp->qlen = 0;
> > +
> > +	/*
> > +	 * We do not need a memory barrier here because the only way we
> > +	 * can get here if there is an rcu_barrier() in flight is if
> > +	 * we are the task doing the rcu_barrier().
> > +	 */
> > +
> > +	/* First adopt the ready-to-invoke callbacks. */
> > +	if (rsp->orphan_donelist != NULL) {
> > +		*rsp->orphan_donetail = *rdp->nxttail[RCU_DONE_TAIL];
> > +		*rdp->nxttail[RCU_DONE_TAIL] = rsp->orphan_donelist;
> > +		for (i = RCU_NEXT_SIZE - 1; i >= RCU_DONE_TAIL; i--)
> > +			if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL])
> > +				rdp->nxttail[i] = rsp->orphan_donetail;
> > +		rsp->orphan_donelist = NULL;
> > +		rsp->orphan_donetail = &rsp->orphan_donelist;
> > +	}
> > +
> > +	/* And then adopt the callbacks that still need a grace period. */
> > +	if (rsp->orphan_nxtlist != NULL) {
> > +		*rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxtlist;
> > +		rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxttail;
> > +		rsp->orphan_nxtlist = NULL;
> > +		rsp->orphan_nxttail = &rsp->orphan_nxtlist;
> > +	}
> > +}
> > +
> > +/*
> > + * Move a dying CPU's RCU callbacks to the rcu_state structure's orphanage.
> >   * Also record a quiescent state for this CPU for the current grace period.
> >   * Synchronization and interrupt disabling are not required because
> >   * this function executes in stop_machine() context.  Therefore, cleanup
> > @@ -1325,64 +1387,67 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
> >  static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
> >  {
> >  	int i;
> > +	unsigned long flags;
> >  	unsigned long mask;
> > -	int receive_cpu = cpumask_any(cpu_online_mask);
> > +	bool orphaned = 0;
> >  	struct rcu_data *rdp = this_cpu_ptr(rsp->rda);
> > -	struct rcu_data *receive_rdp = per_cpu_ptr(rsp->rda, receive_cpu);
> >  	RCU_TRACE(struct rcu_node *rnp = rdp->mynode); /* For dying CPU. */
> >  
> > -	/* First, adjust the counts. */
> > +	/* Move the callbacks to the orphanage under ->onofflock protection. */
> > +	raw_spin_lock_irqsave(&rsp->onofflock, flags);
> > +
> > +	/* First adjust the counts. */
> >  	if (rdp->nxtlist != NULL) {
> > -		receive_rdp->qlen_lazy += rdp->qlen_lazy;
> > -		receive_rdp->qlen += rdp->qlen;
> > +		rsp->qlen_lazy += rdp->qlen_lazy;
> > +		rsp->qlen += rdp->qlen;
> > +		rdp->n_cbs_orphaned += rdp->qlen;
> >  		rdp->qlen_lazy = 0;
> >  		rdp->qlen = 0;
> > +		orphaned = 1;
> >  	}
> >  
> >  	/*
> > -	 * Next, move ready-to-invoke callbacks to be invoked on some
> > -	 * other CPU.  These will not be required to pass through another
> > -	 * grace period:  They are done, regardless of CPU.
> > +	 * Next, move those callbacks still needing a grace period to
> > +	 * the orphanage, where some other CPU will pick them up.
> > +	 * Some of the callbacks might have gone partway through a grace
> > +	 * period, but that is too bad.  They get to start over because we
> > +	 * cannot assume that grace periods are synchronized across CPUs.
> > +	 * We don't bother updating the ->nxttail[] array yet, instead
> > +	 * we just reset the whole thing later on.
> >  	 */
> > -	if (rdp->nxtlist != NULL &&
> > -	    rdp->nxttail[RCU_DONE_TAIL] != &rdp->nxtlist) {
> > -		struct rcu_head *oldhead;
> > -		struct rcu_head **oldtail;
> > -		struct rcu_head **newtail;
> > -
> > -		oldhead = rdp->nxtlist;
> > -		oldtail = receive_rdp->nxttail[RCU_DONE_TAIL];
> > -		rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL];
> > -		*rdp->nxttail[RCU_DONE_TAIL] = *oldtail;
> > -		*receive_rdp->nxttail[RCU_DONE_TAIL] = oldhead;
> > -		newtail = rdp->nxttail[RCU_DONE_TAIL];
> > -		for (i = RCU_DONE_TAIL; i < RCU_NEXT_SIZE; i++) {
> > -			if (receive_rdp->nxttail[i] == oldtail)
> > -				receive_rdp->nxttail[i] = newtail;
> > -			if (rdp->nxttail[i] == newtail)
> > -				rdp->nxttail[i] = &rdp->nxtlist;
> > -		}
> > +	if (*rdp->nxttail[RCU_DONE_TAIL] != NULL) {
> > +		*rsp->orphan_nxttail = *rdp->nxttail[RCU_DONE_TAIL];
> > +		rsp->orphan_nxttail = rdp->nxttail[RCU_NEXT_TAIL];
> > +		*rdp->nxttail[RCU_DONE_TAIL] = NULL;
> >  	}
> >  
> >  	/*
> > -	 * Finally, put the rest of the callbacks at the end of the list.
> > -	 * The ones that made it partway through get to start over:  We
> > -	 * cannot assume that grace periods are synchronized across CPUs.
> > -	 * (We could splice RCU_WAIT_TAIL into RCU_NEXT_READY_TAIL, but
> > -	 * this does not seem compelling.  Not yet, anyway.)
> > +	 * Then move the ready-to-invoke callbacks to the orphanage,
> > +	 * where some other CPU will pick them up.  These will not be
> > +	 * required to pass though another grace period: They are done.
> >  	 */
> >  	if (rdp->nxtlist != NULL) {
> > -		*receive_rdp->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist;
> > -		receive_rdp->nxttail[RCU_NEXT_TAIL] =
> > -				rdp->nxttail[RCU_NEXT_TAIL];
> > -		receive_rdp->n_cbs_adopted += rdp->qlen;
> > -		rdp->n_cbs_orphaned += rdp->qlen;
> > -
> > -		rdp->nxtlist = NULL;
> > -		for (i = 0; i < RCU_NEXT_SIZE; i++)
> > -			rdp->nxttail[i] = &rdp->nxtlist;
> > +		*rsp->orphan_donetail = rdp->nxtlist;
> > +		rsp->orphan_donetail = rdp->nxttail[RCU_DONE_TAIL];
> >  	}
> >  
> > +	/* Finally, initialize the rcu_data structure's list to empty.  */
> > +	rdp->nxtlist = NULL;
> > +	for (i = 0; i < RCU_NEXT_SIZE; i++)
> > +		rdp->nxttail[i] = &rdp->nxtlist;
> > +
> > +	/*
> > +	 * Wait up the rcu_barrier() task if there is one and if we
> > +	 * actually sent anything to the orphanage.  Except that we
> > +	 * must delay the wakeup until ->onofflock is released to
> > +	 * avoid deadlock.
> > +	 */
> > +	if (!rsp->rcu_barrier_in_progress)
> > +		orphaned = 0;
> > +	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> > +	if (orphaned)
> > +		wake_up(&rcu_barrier_wq);
> > +
> >  	/*
> >  	 * Record a quiescent state for the dying CPU.  This is safe
> >  	 * only because we have already cleared out the callbacks.
> > @@ -1415,11 +1480,14 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
> >  	rcu_stop_cpu_kthread(cpu);
> >  	rcu_node_kthread_setaffinity(rnp, -1);
> >  
> > -	/* Remove the dying CPU from the bitmasks in the rcu_node hierarchy. */
> > +	/* Remove the dead CPU from the bitmasks in the rcu_node hierarchy. */
> >  
> >  	/* Exclude any attempts to start a new grace period. */
> >  	raw_spin_lock_irqsave(&rsp->onofflock, flags);
> >  
> > +	/* Collect the dead CPU's callbacks. */
> > +	rcu_adopt_orphan_cbs(rsp);
> > +
> >  	/* Remove the outgoing CPU from the masks in the rcu_node hierarchy. */
> >  	mask = rdp->grpmask;	/* rnp->grplo is constant. */
> >  	do {
> > @@ -1456,6 +1524,10 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
> >  
> >  #else /* #ifdef CONFIG_HOTPLUG_CPU */
> >  
> > +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> > +{
> > +}
> > +
> >  static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
> >  {
> >  }
> > @@ -1524,9 +1596,6 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> >  			    rcu_is_callbacks_kthread());
> >  
> >  	/* Update count, and requeue any remaining callbacks. */
> > -	rdp->qlen_lazy -= count_lazy;
> > -	rdp->qlen -= count;
> > -	rdp->n_cbs_invoked += count;
> >  	if (list != NULL) {
> >  		*tail = rdp->nxtlist;
> >  		rdp->nxtlist = list;
> > @@ -1536,6 +1605,10 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> >  			else
> >  				break;
> >  	}
> > +	smp_mb(); /* List handling before counting for rcu_barrier(). */
> > +	rdp->qlen_lazy -= count_lazy;
> > +	rdp->qlen -= count;
> > +	rdp->n_cbs_invoked += count;
> >  
> >  	/* Reinstate batch limit if we have worked down the excess. */
> >  	if (rdp->blimit == LONG_MAX && rdp->qlen <= qlowmark)
> > @@ -1824,13 +1897,14 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
> >  	rdp = this_cpu_ptr(rsp->rda);
> >  
> >  	/* Add the callback to our list. */
> > -	*rdp->nxttail[RCU_NEXT_TAIL] = head;
> > -	rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
> >  	rdp->qlen++;
> >  	if (lazy)
> >  		rdp->qlen_lazy++;
> >  	else
> >  		rcu_idle_count_callbacks_posted();
> > +	smp_mb();  /* Count before adding callback for rcu_barrier(). */
> > +	*rdp->nxttail[RCU_NEXT_TAIL] = head;
> > +	rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
> >  
> >  	if (__is_kfree_rcu_offset((unsigned long)func))
> >  		trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func,
> > @@ -2169,15 +2243,10 @@ static int rcu_cpu_has_callbacks(int cpu)
> >  	       rcu_preempt_cpu_has_callbacks(cpu);
> >  }
> >  
> > -static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
> > -static atomic_t rcu_barrier_cpu_count;
> > -static DEFINE_MUTEX(rcu_barrier_mutex);
> > -static struct completion rcu_barrier_completion;
> > -
> >  static void rcu_barrier_callback(struct rcu_head *notused)
> >  {
> >  	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
> > -		complete(&rcu_barrier_completion);
> > +		wake_up(&rcu_barrier_wq);
> >  }
> >  
> >  /*
> > @@ -2203,27 +2272,121 @@ static void _rcu_barrier(struct rcu_state *rsp,
> >  			 void (*call_rcu_func)(struct rcu_head *head,
> >  					       void (*func)(struct rcu_head *head)))
> >  {
> > -	BUG_ON(in_interrupt());
> > +	int cpu;
> > +	unsigned long flags;
> > +	struct rcu_data *rdp;
> > +	struct rcu_head rh;
> > +
> > +	init_rcu_head_on_stack(&rh);
> > +
> >  	/* Take mutex to serialize concurrent rcu_barrier() requests. */
> >  	mutex_lock(&rcu_barrier_mutex);
> > -	init_completion(&rcu_barrier_completion);
> > +
> > +	smp_mb();  /* Prevent any prior operations from leaking in. */
> > +
> >  	/*
> > -	 * Initialize rcu_barrier_cpu_count to 1, then invoke
> > -	 * rcu_barrier_func() on each CPU, so that each CPU also has
> > -	 * incremented rcu_barrier_cpu_count.  Only then is it safe to
> > -	 * decrement rcu_barrier_cpu_count -- otherwise the first CPU
> > -	 * might complete its grace period before all of the other CPUs
> > -	 * did their increment, causing this function to return too
> > -	 * early.  Note that on_each_cpu() disables irqs, which prevents
> > -	 * any CPUs from coming online or going offline until each online
> > -	 * CPU has queued its RCU-barrier callback.
> > +	 * Initialize the count to one rather than to zero in order to
> > +	 * avoid a too-soon return to zero in case of a short grace period
> > +	 * (or preemption of this task).  Also flag this task as doing
> > +	 * an rcu_barrier().  This will prevent anyone else from adopting
> > +	 * orphaned callbacks, which could cause otherwise failure if a
> > +	 * CPU went offline and quickly came back online.  To see this,
> > +	 * consider the following sequence of events:
> > +	 *
> > +	 * 1.	We cause CPU 0 to post an rcu_barrier_callback() callback.
> > +	 * 2.	CPU 1 goes offline, orphaning its callbacks.
> > +	 * 3.	CPU 0 adopts CPU 1's orphaned callbacks.
> > +	 * 4.	CPU 1 comes back online.
> > +	 * 5.	We cause CPU 1 to post an rcu_barrier_callback() callback.
> > +	 * 6.	Both rcu_barrier_callback() callbacks are invoked, awakening
> > +	 *	us -- but before CPU 1's orphaned callbacks are invoked!!!
> >  	 */
> >  	atomic_set(&rcu_barrier_cpu_count, 1);
> > -	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
> > -	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
> > -		complete(&rcu_barrier_completion);
> > -	wait_for_completion(&rcu_barrier_completion);
> > +	raw_spin_lock_irqsave(&rsp->onofflock, flags);
> > +	rsp->rcu_barrier_in_progress = current;
> > +	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> > +
> > +	/*
> > +	 * Force every CPU with callbacks to register a new callback
> > +	 * that will tell us when all the preceding callbacks have
> > +	 * been invoked.  If an offline CPU has callbacks, wait for
> > +	 * it to either come back online or to finish orphaning those
> > +	 * callbacks.
> > +	 */
> > +	for_each_possible_cpu(cpu) {
> > +		preempt_disable();
> > +		rdp = per_cpu_ptr(rsp->rda, cpu);
> > +		if (cpu_is_offline(cpu)) {
> > +			preempt_enable();
> > +			while (cpu_is_offline(cpu) &&
> > +			       ACCESS_ONCE(rdp->qlen))
> > +				schedule_timeout_interruptible(1);
> > +		} else if (ACCESS_ONCE(rdp->qlen)) {
> > +			smp_call_function_single(cpu, rcu_barrier_func,
> > +						 (void *)call_rcu_func, 1);
> > +			preempt_enable();
> > +		}
> > +	}
> > +
> > +	/*
> > +	 * Force any ongoing CPU-hotplug operations to complete,
> > +	 * so that any callbacks from the outgoing CPUs are now in
> > +	 * the orphanage.
> > +	 */
> > +	cpu_maps_update_begin();
> > +	cpu_maps_update_done();
> > +
> > +	/*
> > +	 * Now that all online CPUs have rcu_barrier_callback() callbacks
> > +	 * posted, we can adopt all of the orphaned callbacks and place
> > +	 * an rcu_barrier_callback() callback after them.  When that is done,
> > +	 * we are guaranteed to have an rcu_barrier_callback() callback
> > +	 * following every callback that could possibly have been
> > +	 * registered before _rcu_barrier() was called.
> > +	 */
> > +	raw_spin_lock_irqsave(&rsp->onofflock, flags);
> > +	rcu_adopt_orphan_cbs(rsp);
> > +	atomic_inc(&rcu_barrier_cpu_count);
> > +	call_rcu_func(&rh, rcu_barrier_callback);
> > +	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> > +
> > +	/*
> > +	 * Now that we have an rcu_barrier_callback() callback on each
> > +	 * CPU, and thus each counted, remove the initial count.
> > +	 */
> > +	atomic_dec(&rcu_barrier_cpu_count);
> > +	smp_mb__after_atomic_dec();
> > +
> > +	/*
> > +	 * Loop waiting for all rcu_barrier_callback() callbacks to be
> > +	 * invoked.  Adopt any orphaned callbacks in the meantime, just
> > +	 * in case one of the rcu_barrier_callback() callbacks is orphaned.
> > +	 */
> > +	while (atomic_read(&rcu_barrier_cpu_count) > 0) {
> > +		wait_event(rcu_barrier_wq,
> > +			   atomic_read(&rcu_barrier_cpu_count) == 0 ||
> > +			   ACCESS_ONCE(rsp->qlen));
> > +		if (ACCESS_ONCE(rsp->qlen)) {
> > +			raw_spin_lock_irqsave(&rsp->onofflock, flags);
> > +			rcu_adopt_orphan_cbs(rsp);
> > +			raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> > +		}
> > +	}
> > +
> > +	/*
> > +	 * Done, so let others adopt orphaned callbacks.  But avoid
> > +	 * indefinite postponement of any additional orphans by adopting
> > +	 * one more time.
> > +	 */
> > +	raw_spin_lock_irqsave(&rsp->onofflock, flags);
> > +	rcu_adopt_orphan_cbs(rsp);
> > +	rsp->rcu_barrier_in_progress = NULL;
> > +	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> > +
> > +	/* Other rcu_barrier() invocations can now safely proceed. */
> >  	mutex_unlock(&rcu_barrier_mutex);
> > +
> > +	destroy_rcu_head_on_stack(&rh);
> >  }
> >  
> >  /**
> > diff --git a/kernel/rcutree.h b/kernel/rcutree.h
> > index 36ca28e..1e49c56 100644
> > --- a/kernel/rcutree.h
> > +++ b/kernel/rcutree.h
> > @@ -371,6 +371,17 @@ struct rcu_state {
> >  
> >  	raw_spinlock_t onofflock;		/* exclude on/offline and */
> >  						/*  starting new GP. */
> > +	struct rcu_head *orphan_nxtlist;	/* Orphaned callbacks that */
> > +						/*  need a grace period. */
> > +	struct rcu_head **orphan_nxttail;	/* Tail of above. */
> > +	struct rcu_head *orphan_donelist;	/* Orphaned callbacks that */
> > +						/*  are ready to invoke. */
> > +	struct rcu_head **orphan_donetail;	/* Tail of above. */
> > +	long qlen_lazy;				/* Number of lazy callbacks. */
> > +	long qlen;				/* Total number of callbacks. */
> > +	struct task_struct *rcu_barrier_in_progress;
> > +						/* Task doing rcu_barrier(), */
> > +						/*  or NULL if no barrier. */
> >  	raw_spinlock_t fqslock;			/* Only one task forcing */
> >  						/*  quiescent states. */
> >  	unsigned long jiffies_force_qs;		/* Time at which to invoke */
> > diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
> > index ed459ed..d4bc16d 100644
> > --- a/kernel/rcutree_trace.c
> > +++ b/kernel/rcutree_trace.c
> > @@ -271,13 +271,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
> >  
> >  	gpnum = rsp->gpnum;
> >  	seq_printf(m, "c=%lu g=%lu s=%d jfq=%ld j=%x "
> > -		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n",
> > +		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld/%ld\n",
> >  		   rsp->completed, gpnum, rsp->fqs_state,
> >  		   (long)(rsp->jiffies_force_qs - jiffies),
> >  		   (int)(jiffies & 0xffff),
> >  		   rsp->n_force_qs, rsp->n_force_qs_ngp,
> >  		   rsp->n_force_qs - rsp->n_force_qs_ngp,
> > -		   rsp->n_force_qs_lh);
> > +		   rsp->n_force_qs_lh, rsp->qlen_lazy, rsp->qlen);
> >  	for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) {
> >  		if (rnp->level != level) {
> >  			seq_puts(m, "\n");
> > 
> 
> -- 
> Mathieu Desnoyers
> Operating System Efficiency R&D Consultant
> EfficiOS Inc.
> http://www.efficios.com
>
Paul E. McKenney March 15, 2012, 6:31 p.m. | #3
On Thu, Mar 15, 2012 at 11:21:59AM -0700, Paul E. McKenney wrote:
> On Thu, Mar 15, 2012 at 01:45:27PM -0400, Mathieu Desnoyers wrote:
> > * Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:
> > > The rcu_barrier() primitive interrupts each and every CPU, registering
> > > a callback on every CPU.  Once all of these callbacks have been invoked,
> > > rcu_barrier() knows that every callback that was registered before
> > > the call to rcu_barrier() has also been invoked.
> > > 
> > > However, there is no point in registering a callback on a CPU that
> > > currently has no callbacks, most especially if that CPU is in a
> > > deep idle state.  This commit therefore makes rcu_barrier() avoid
> > > interrupting CPUs that have no callbacks.  Doing this requires reworking
> > > the handling of orphaned callbacks, otherwise callbacks could slip through
> > > rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had
> > > not yet interrupted to a CPU that rcu_barrier() had already interrupted.
> > > This reworking was needed anyway to take a first step towards weaning
> > > RCU from the CPU_DYING notifier's use of stop_cpu().
> > 
> > Quoting Documentation/RCU/rcubarrier.txt:
> > 
> > "We instead need the rcu_barrier() primitive. This primitive is similar
> > to synchronize_rcu(), but instead of waiting solely for a grace
> > period to elapse, it also waits for all outstanding RCU callbacks to
> > complete. Pseudo-code using rcu_barrier() is as follows:"
> > 
> > The patch you propose seems like a good approach to make rcu_barrier
> > less disruptive, but everyone need to be aware that rcu_barrier() would
> > quit having the side-effect of doing the equivalent of
> > "synchronize_rcu()" from now on: within this new approach, in the case
> > where there are no pending callbacks, rcu_barrier() could, AFAIU, return
> > without waiting for the current grace period to complete.
> > 
> > Any use of rcu_barrier() that would assume that a synchronize_rcu() is
> > implicit with the rcu_barrier() execution would be a bug anyway, but
> > those might only show up after this patch is applied. I would therefore
> > recommend to audit all rcu_barrier() users to ensure none is expecting
> > rcu_barrier to act as a synchronize_rcu before pushing this change.
> 
> Good catch!
> 
> I am going to chicken out and explicitly wait for a grace period if there
> were no callbacks.  Having rcu_barrier() very rarely be a quick no-op does
> sound like a standing invitation for subtle non-reproducible bugs.  ;-)

I take it back...

After adopting callbacks (rcu_adopt_orphan_cbs()), _rcu_barrier()
unconditionally posts a callback on the current CPU and waits for it.
So _rcu_barrier() actually does always wait for a grace period.

Yes, I could be more dainty and make rcu_adopt_orphan_cbs() return an
indication of whether there were any callbacks, and then post the callback
only if either there were some callbacks adopted or if there were no calls
to smp_call_function_single().  But that adds complexity for almost no
benefit -- and no one can accuse _rcu_barrier() of being a fastpath!  ;-)

Or am I missing something here?

							Thanx, Paul
Mathieu Desnoyers March 15, 2012, 6:50 p.m. | #4
* Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:
> On Thu, Mar 15, 2012 at 11:21:59AM -0700, Paul E. McKenney wrote:
> > On Thu, Mar 15, 2012 at 01:45:27PM -0400, Mathieu Desnoyers wrote:
> > > * Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:
> > > > The rcu_barrier() primitive interrupts each and every CPU, registering
> > > > a callback on every CPU.  Once all of these callbacks have been invoked,
> > > > rcu_barrier() knows that every callback that was registered before
> > > > the call to rcu_barrier() has also been invoked.
> > > > 
> > > > However, there is no point in registering a callback on a CPU that
> > > > currently has no callbacks, most especially if that CPU is in a
> > > > deep idle state.  This commit therefore makes rcu_barrier() avoid
> > > > interrupting CPUs that have no callbacks.  Doing this requires reworking
> > > > the handling of orphaned callbacks, otherwise callbacks could slip through
> > > > rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had
> > > > not yet interrupted to a CPU that rcu_barrier() had already interrupted.
> > > > This reworking was needed anyway to take a first step towards weaning
> > > > RCU from the CPU_DYING notifier's use of stop_cpu().
> > > 
> > > Quoting Documentation/RCU/rcubarrier.txt:
> > > 
> > > "We instead need the rcu_barrier() primitive. This primitive is similar
> > > to synchronize_rcu(), but instead of waiting solely for a grace
> > > period to elapse, it also waits for all outstanding RCU callbacks to
> > > complete. Pseudo-code using rcu_barrier() is as follows:"
> > > 
> > > The patch you propose seems like a good approach to make rcu_barrier
> > > less disruptive, but everyone need to be aware that rcu_barrier() would
> > > quit having the side-effect of doing the equivalent of
> > > "synchronize_rcu()" from now on: within this new approach, in the case
> > > where there are no pending callbacks, rcu_barrier() could, AFAIU, return
> > > without waiting for the current grace period to complete.
> > > 
> > > Any use of rcu_barrier() that would assume that a synchronize_rcu() is
> > > implicit with the rcu_barrier() execution would be a bug anyway, but
> > > those might only show up after this patch is applied. I would therefore
> > > recommend to audit all rcu_barrier() users to ensure none is expecting
> > > rcu_barrier to act as a synchronize_rcu before pushing this change.
> > 
> > Good catch!
> > 
> > I am going to chicken out and explicitly wait for a grace period if there
> > were no callbacks.  Having rcu_barrier() very rarely be a quick no-op does
> > sound like a standing invitation for subtle non-reproducible bugs.  ;-)
> 
> I take it back...
> 
> After adopting callbacks (rcu_adopt_orphan_cbs()), _rcu_barrier()
> unconditionally posts a callback on the current CPU and waits for it.
> So _rcu_barrier() actually does always wait for a grace period.

Ah ok, that should handle it then.

> 
> Yes, I could be more dainty and make rcu_adopt_orphan_cbs() return an
> indication of whether there were any callbacks, and then post the callback
> only if either there were some callbacks adopted or if there were no calls
> to smp_call_function_single().  But that adds complexity for almost no
> benefit -- and no one can accuse _rcu_barrier() of being a fastpath!  ;-)
> 
> Or am I missing something here?

Nope, I think it all makes sense.

Thanks,

Mathieu

> 
> 							Thanx, Paul
>
Srivatsa S. Bhat March 19, 2012, 2:23 p.m. | #5
Hi Paul,

On 03/15/2012 10:18 PM, Paul E. McKenney wrote:

> The rcu_barrier() primitive interrupts each and every CPU, registering
> a callback on every CPU.  Once all of these callbacks have been invoked,
> rcu_barrier() knows that every callback that was registered before
> the call to rcu_barrier() has also been invoked.
> 
> However, there is no point in registering a callback on a CPU that
> currently has no callbacks, most especially if that CPU is in a
> deep idle state.  This commit therefore makes rcu_barrier() avoid
> interrupting CPUs that have no callbacks.  Doing this requires reworking
> the handling of orphaned callbacks, otherwise callbacks could slip through
> rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had
> not yet interrupted to a CPU that rcu_barrier() had already interrupted.
> This reworking was needed anyway to take a first step towards weaning
> RCU from the CPU_DYING notifier's use of stop_cpu().
> 

s/stop_cpu()/stop_machine() ?


Thank you for doing this! :-)

> Signed-off-by: Paul E. McKenney <paul.mckenney@linaro.org>
> Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
> 


Regards,
Srivatsa S. Bhat
IBM Linux Technology Center

Patch

diff --git a/kernel/rcutree.c b/kernel/rcutree.c
index 403306b..8269656 100644
--- a/kernel/rcutree.c
+++ b/kernel/rcutree.c
@@ -75,6 +75,8 @@  static struct lock_class_key rcu_node_class[NUM_RCU_LVLS];
 	.gpnum = -300, \
 	.completed = -300, \
 	.onofflock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.onofflock), \
+	.orphan_nxttail = &structname##_state.orphan_nxtlist, \
+	.orphan_donetail = &structname##_state.orphan_donelist, \
 	.fqslock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.fqslock), \
 	.n_force_qs = 0, \
 	.n_force_qs_ngp = 0, \
@@ -145,6 +147,13 @@  static void invoke_rcu_callbacks(struct rcu_state *rsp, struct rcu_data *rdp);
 unsigned long rcutorture_testseq;
 unsigned long rcutorture_vernum;
 
+/* State information for rcu_barrier() and friends. */
+
+static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
+static atomic_t rcu_barrier_cpu_count;
+static DEFINE_MUTEX(rcu_barrier_mutex);
+static DECLARE_WAIT_QUEUE_HEAD(rcu_barrier_wq);
+
 /*
  * Return true if an RCU grace period is in progress.  The ACCESS_ONCE()s
  * permit this function to be invoked without holding the root rcu_node
@@ -1311,7 +1320,60 @@  rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
 #ifdef CONFIG_HOTPLUG_CPU
 
 /*
- * Move a dying CPU's RCU callbacks to online CPU's callback list.
+ * Adopt the RCU callbacks from the specified rcu_state structure's
+ * orphanage.  The caller must hold the ->onofflock.
+ */
+static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
+{
+	int i;
+	struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
+
+	/*
+	 * If there is an rcu_barrier() operation in progress, then
+	 * only the task doing that operation is permitted to adopt
+	 * callbacks.  To do otherwise breaks rcu_barrier() and friends
+	 * by causing them to fail to wait for the callbacks in the
+	 * orphanage.
+	 */
+	if (rsp->rcu_barrier_in_progress &&
+	    rsp->rcu_barrier_in_progress != current)
+		return;
+
+	/* Do the accounting first. */
+	rdp->qlen_lazy += rsp->qlen_lazy;
+	rdp->qlen += rsp->qlen;
+	rdp->n_cbs_adopted += rsp->qlen;
+	rsp->qlen_lazy = 0;
+	rsp->qlen = 0;
+
+	/*
+	 * We do not need a memory barrier here because the only way we
+	 * can get here if there is an rcu_barrier() in flight is if
+	 * we are the task doing the rcu_barrier().
+	 */
+
+	/* First adopt the ready-to-invoke callbacks. */
+	if (rsp->orphan_donelist != NULL) {
+		*rsp->orphan_donetail = *rdp->nxttail[RCU_DONE_TAIL];
+		*rdp->nxttail[RCU_DONE_TAIL] = rsp->orphan_donelist;
+		for (i = RCU_NEXT_SIZE - 1; i >= RCU_DONE_TAIL; i--)
+			if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL])
+				rdp->nxttail[i] = rsp->orphan_donetail;
+		rsp->orphan_donelist = NULL;
+		rsp->orphan_donetail = &rsp->orphan_donelist;
+	}
+
+	/* And then adopt the callbacks that still need a grace period. */
+	if (rsp->orphan_nxtlist != NULL) {
+		*rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxtlist;
+		rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxttail;
+		rsp->orphan_nxtlist = NULL;
+		rsp->orphan_nxttail = &rsp->orphan_nxtlist;
+	}
+}
+
+/*
+ * Move a dying CPU's RCU callbacks to the rcu_state structure's orphanage.
  * Also record a quiescent state for this CPU for the current grace period.
  * Synchronization and interrupt disabling are not required because
  * this function executes in stop_machine() context.  Therefore, cleanup
@@ -1325,64 +1387,67 @@  rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
 static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
 {
 	int i;
+	unsigned long flags;
 	unsigned long mask;
-	int receive_cpu = cpumask_any(cpu_online_mask);
+	bool orphaned = 0;
 	struct rcu_data *rdp = this_cpu_ptr(rsp->rda);
-	struct rcu_data *receive_rdp = per_cpu_ptr(rsp->rda, receive_cpu);
 	RCU_TRACE(struct rcu_node *rnp = rdp->mynode); /* For dying CPU. */
 
-	/* First, adjust the counts. */
+	/* Move the callbacks to the orphanage under ->onofflock protection. */
+	raw_spin_lock_irqsave(&rsp->onofflock, flags);
+
+	/* First adjust the counts. */
 	if (rdp->nxtlist != NULL) {
-		receive_rdp->qlen_lazy += rdp->qlen_lazy;
-		receive_rdp->qlen += rdp->qlen;
+		rsp->qlen_lazy += rdp->qlen_lazy;
+		rsp->qlen += rdp->qlen;
+		rdp->n_cbs_orphaned += rdp->qlen;
 		rdp->qlen_lazy = 0;
 		rdp->qlen = 0;
+		orphaned = 1;
 	}
 
 	/*
-	 * Next, move ready-to-invoke callbacks to be invoked on some
-	 * other CPU.  These will not be required to pass through another
-	 * grace period:  They are done, regardless of CPU.
+	 * Next, move those callbacks still needing a grace period to
+	 * the orphanage, where some other CPU will pick them up.
+	 * Some of the callbacks might have gone partway through a grace
+	 * period, but that is too bad.  They get to start over because we
+	 * cannot assume that grace periods are synchronized across CPUs.
+	 * We don't bother updating the ->nxttail[] array yet, instead
+	 * we just reset the whole thing later on.
 	 */
-	if (rdp->nxtlist != NULL &&
-	    rdp->nxttail[RCU_DONE_TAIL] != &rdp->nxtlist) {
-		struct rcu_head *oldhead;
-		struct rcu_head **oldtail;
-		struct rcu_head **newtail;
-
-		oldhead = rdp->nxtlist;
-		oldtail = receive_rdp->nxttail[RCU_DONE_TAIL];
-		rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL];
-		*rdp->nxttail[RCU_DONE_TAIL] = *oldtail;
-		*receive_rdp->nxttail[RCU_DONE_TAIL] = oldhead;
-		newtail = rdp->nxttail[RCU_DONE_TAIL];
-		for (i = RCU_DONE_TAIL; i < RCU_NEXT_SIZE; i++) {
-			if (receive_rdp->nxttail[i] == oldtail)
-				receive_rdp->nxttail[i] = newtail;
-			if (rdp->nxttail[i] == newtail)
-				rdp->nxttail[i] = &rdp->nxtlist;
-		}
+	if (*rdp->nxttail[RCU_DONE_TAIL] != NULL) {
+		*rsp->orphan_nxttail = *rdp->nxttail[RCU_DONE_TAIL];
+		rsp->orphan_nxttail = rdp->nxttail[RCU_NEXT_TAIL];
+		*rdp->nxttail[RCU_DONE_TAIL] = NULL;
 	}
 
 	/*
-	 * Finally, put the rest of the callbacks at the end of the list.
-	 * The ones that made it partway through get to start over:  We
-	 * cannot assume that grace periods are synchronized across CPUs.
-	 * (We could splice RCU_WAIT_TAIL into RCU_NEXT_READY_TAIL, but
-	 * this does not seem compelling.  Not yet, anyway.)
+	 * Then move the ready-to-invoke callbacks to the orphanage,
+	 * where some other CPU will pick them up.  These will not be
+	 * required to pass though another grace period: They are done.
 	 */
 	if (rdp->nxtlist != NULL) {
-		*receive_rdp->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist;
-		receive_rdp->nxttail[RCU_NEXT_TAIL] =
-				rdp->nxttail[RCU_NEXT_TAIL];
-		receive_rdp->n_cbs_adopted += rdp->qlen;
-		rdp->n_cbs_orphaned += rdp->qlen;
-
-		rdp->nxtlist = NULL;
-		for (i = 0; i < RCU_NEXT_SIZE; i++)
-			rdp->nxttail[i] = &rdp->nxtlist;
+		*rsp->orphan_donetail = rdp->nxtlist;
+		rsp->orphan_donetail = rdp->nxttail[RCU_DONE_TAIL];
 	}
 
+	/* Finally, initialize the rcu_data structure's list to empty.  */
+	rdp->nxtlist = NULL;
+	for (i = 0; i < RCU_NEXT_SIZE; i++)
+		rdp->nxttail[i] = &rdp->nxtlist;
+
+	/*
+	 * Wait up the rcu_barrier() task if there is one and if we
+	 * actually sent anything to the orphanage.  Except that we
+	 * must delay the wakeup until ->onofflock is released to
+	 * avoid deadlock.
+	 */
+	if (!rsp->rcu_barrier_in_progress)
+		orphaned = 0;
+	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
+	if (orphaned)
+		wake_up(&rcu_barrier_wq);
+
 	/*
 	 * Record a quiescent state for the dying CPU.  This is safe
 	 * only because we have already cleared out the callbacks.
@@ -1415,11 +1480,14 @@  static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
 	rcu_stop_cpu_kthread(cpu);
 	rcu_node_kthread_setaffinity(rnp, -1);
 
-	/* Remove the dying CPU from the bitmasks in the rcu_node hierarchy. */
+	/* Remove the dead CPU from the bitmasks in the rcu_node hierarchy. */
 
 	/* Exclude any attempts to start a new grace period. */
 	raw_spin_lock_irqsave(&rsp->onofflock, flags);
 
+	/* Collect the dead CPU's callbacks. */
+	rcu_adopt_orphan_cbs(rsp);
+
 	/* Remove the outgoing CPU from the masks in the rcu_node hierarchy. */
 	mask = rdp->grpmask;	/* rnp->grplo is constant. */
 	do {
@@ -1456,6 +1524,10 @@  static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
 
 #else /* #ifdef CONFIG_HOTPLUG_CPU */
 
+static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
+{
+}
+
 static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
 {
 }
@@ -1524,9 +1596,6 @@  static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
 			    rcu_is_callbacks_kthread());
 
 	/* Update count, and requeue any remaining callbacks. */
-	rdp->qlen_lazy -= count_lazy;
-	rdp->qlen -= count;
-	rdp->n_cbs_invoked += count;
 	if (list != NULL) {
 		*tail = rdp->nxtlist;
 		rdp->nxtlist = list;
@@ -1536,6 +1605,10 @@  static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
 			else
 				break;
 	}
+	smp_mb(); /* List handling before counting for rcu_barrier(). */
+	rdp->qlen_lazy -= count_lazy;
+	rdp->qlen -= count;
+	rdp->n_cbs_invoked += count;
 
 	/* Reinstate batch limit if we have worked down the excess. */
 	if (rdp->blimit == LONG_MAX && rdp->qlen <= qlowmark)
@@ -1824,13 +1897,14 @@  __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
 	rdp = this_cpu_ptr(rsp->rda);
 
 	/* Add the callback to our list. */
-	*rdp->nxttail[RCU_NEXT_TAIL] = head;
-	rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
 	rdp->qlen++;
 	if (lazy)
 		rdp->qlen_lazy++;
 	else
 		rcu_idle_count_callbacks_posted();
+	smp_mb();  /* Count before adding callback for rcu_barrier(). */
+	*rdp->nxttail[RCU_NEXT_TAIL] = head;
+	rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
 
 	if (__is_kfree_rcu_offset((unsigned long)func))
 		trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func,
@@ -2169,15 +2243,10 @@  static int rcu_cpu_has_callbacks(int cpu)
 	       rcu_preempt_cpu_has_callbacks(cpu);
 }
 
-static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
-static atomic_t rcu_barrier_cpu_count;
-static DEFINE_MUTEX(rcu_barrier_mutex);
-static struct completion rcu_barrier_completion;
-
 static void rcu_barrier_callback(struct rcu_head *notused)
 {
 	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
-		complete(&rcu_barrier_completion);
+		wake_up(&rcu_barrier_wq);
 }
 
 /*
@@ -2203,27 +2272,121 @@  static void _rcu_barrier(struct rcu_state *rsp,
 			 void (*call_rcu_func)(struct rcu_head *head,
 					       void (*func)(struct rcu_head *head)))
 {
-	BUG_ON(in_interrupt());
+	int cpu;
+	unsigned long flags;
+	struct rcu_data *rdp;
+	struct rcu_head rh;
+
+	init_rcu_head_on_stack(&rh);
+
 	/* Take mutex to serialize concurrent rcu_barrier() requests. */
 	mutex_lock(&rcu_barrier_mutex);
-	init_completion(&rcu_barrier_completion);
+
+	smp_mb();  /* Prevent any prior operations from leaking in. */
+
 	/*
-	 * Initialize rcu_barrier_cpu_count to 1, then invoke
-	 * rcu_barrier_func() on each CPU, so that each CPU also has
-	 * incremented rcu_barrier_cpu_count.  Only then is it safe to
-	 * decrement rcu_barrier_cpu_count -- otherwise the first CPU
-	 * might complete its grace period before all of the other CPUs
-	 * did their increment, causing this function to return too
-	 * early.  Note that on_each_cpu() disables irqs, which prevents
-	 * any CPUs from coming online or going offline until each online
-	 * CPU has queued its RCU-barrier callback.
+	 * Initialize the count to one rather than to zero in order to
+	 * avoid a too-soon return to zero in case of a short grace period
+	 * (or preemption of this task).  Also flag this task as doing
+	 * an rcu_barrier().  This will prevent anyone else from adopting
+	 * orphaned callbacks, which could cause otherwise failure if a
+	 * CPU went offline and quickly came back online.  To see this,
+	 * consider the following sequence of events:
+	 *
+	 * 1.	We cause CPU 0 to post an rcu_barrier_callback() callback.
+	 * 2.	CPU 1 goes offline, orphaning its callbacks.
+	 * 3.	CPU 0 adopts CPU 1's orphaned callbacks.
+	 * 4.	CPU 1 comes back online.
+	 * 5.	We cause CPU 1 to post an rcu_barrier_callback() callback.
+	 * 6.	Both rcu_barrier_callback() callbacks are invoked, awakening
+	 *	us -- but before CPU 1's orphaned callbacks are invoked!!!
 	 */
 	atomic_set(&rcu_barrier_cpu_count, 1);
-	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
-	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
-		complete(&rcu_barrier_completion);
-	wait_for_completion(&rcu_barrier_completion);
+	raw_spin_lock_irqsave(&rsp->onofflock, flags);
+	rsp->rcu_barrier_in_progress = current;
+	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
+
+	/*
+	 * Force every CPU with callbacks to register a new callback
+	 * that will tell us when all the preceding callbacks have
+	 * been invoked.  If an offline CPU has callbacks, wait for
+	 * it to either come back online or to finish orphaning those
+	 * callbacks.
+	 */
+	for_each_possible_cpu(cpu) {
+		preempt_disable();
+		rdp = per_cpu_ptr(rsp->rda, cpu);
+		if (cpu_is_offline(cpu)) {
+			preempt_enable();
+			while (cpu_is_offline(cpu) &&
+			       ACCESS_ONCE(rdp->qlen))
+				schedule_timeout_interruptible(1);
+		} else if (ACCESS_ONCE(rdp->qlen)) {
+			smp_call_function_single(cpu, rcu_barrier_func,
+						 (void *)call_rcu_func, 1);
+			preempt_enable();
+		}
+	}
+
+	/*
+	 * Force any ongoing CPU-hotplug operations to complete,
+	 * so that any callbacks from the outgoing CPUs are now in
+	 * the orphanage.
+	 */
+	cpu_maps_update_begin();
+	cpu_maps_update_done();
+
+	/*
+	 * Now that all online CPUs have rcu_barrier_callback() callbacks
+	 * posted, we can adopt all of the orphaned callbacks and place
+	 * an rcu_barrier_callback() callback after them.  When that is done,
+	 * we are guaranteed to have an rcu_barrier_callback() callback
+	 * following every callback that could possibly have been
+	 * registered before _rcu_barrier() was called.
+	 */
+	raw_spin_lock_irqsave(&rsp->onofflock, flags);
+	rcu_adopt_orphan_cbs(rsp);
+	atomic_inc(&rcu_barrier_cpu_count);
+	call_rcu_func(&rh, rcu_barrier_callback);
+	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
+
+	/*
+	 * Now that we have an rcu_barrier_callback() callback on each
+	 * CPU, and thus each counted, remove the initial count.
+	 */
+	atomic_dec(&rcu_barrier_cpu_count);
+	smp_mb__after_atomic_dec();
+
+	/*
+	 * Loop waiting for all rcu_barrier_callback() callbacks to be
+	 * invoked.  Adopt any orphaned callbacks in the meantime, just
+	 * in case one of the rcu_barrier_callback() callbacks is orphaned.
+	 */
+	while (atomic_read(&rcu_barrier_cpu_count) > 0) {
+		wait_event(rcu_barrier_wq,
+			   atomic_read(&rcu_barrier_cpu_count) == 0 ||
+			   ACCESS_ONCE(rsp->qlen));
+		if (ACCESS_ONCE(rsp->qlen)) {
+			raw_spin_lock_irqsave(&rsp->onofflock, flags);
+			rcu_adopt_orphan_cbs(rsp);
+			raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
+		}
+	}
+
+	/*
+	 * Done, so let others adopt orphaned callbacks.  But avoid
+	 * indefinite postponement of any additional orphans by adopting
+	 * one more time.
+	 */
+	raw_spin_lock_irqsave(&rsp->onofflock, flags);
+	rcu_adopt_orphan_cbs(rsp);
+	rsp->rcu_barrier_in_progress = NULL;
+	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
+
+	/* Other rcu_barrier() invocations can now safely proceed. */
 	mutex_unlock(&rcu_barrier_mutex);
+
+	destroy_rcu_head_on_stack(&rh);
 }
 
 /**
diff --git a/kernel/rcutree.h b/kernel/rcutree.h
index 36ca28e..1e49c56 100644
--- a/kernel/rcutree.h
+++ b/kernel/rcutree.h
@@ -371,6 +371,17 @@  struct rcu_state {
 
 	raw_spinlock_t onofflock;		/* exclude on/offline and */
 						/*  starting new GP. */
+	struct rcu_head *orphan_nxtlist;	/* Orphaned callbacks that */
+						/*  need a grace period. */
+	struct rcu_head **orphan_nxttail;	/* Tail of above. */
+	struct rcu_head *orphan_donelist;	/* Orphaned callbacks that */
+						/*  are ready to invoke. */
+	struct rcu_head **orphan_donetail;	/* Tail of above. */
+	long qlen_lazy;				/* Number of lazy callbacks. */
+	long qlen;				/* Total number of callbacks. */
+	struct task_struct *rcu_barrier_in_progress;
+						/* Task doing rcu_barrier(), */
+						/*  or NULL if no barrier. */
 	raw_spinlock_t fqslock;			/* Only one task forcing */
 						/*  quiescent states. */
 	unsigned long jiffies_force_qs;		/* Time at which to invoke */
diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
index ed459ed..d4bc16d 100644
--- a/kernel/rcutree_trace.c
+++ b/kernel/rcutree_trace.c
@@ -271,13 +271,13 @@  static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
 
 	gpnum = rsp->gpnum;
 	seq_printf(m, "c=%lu g=%lu s=%d jfq=%ld j=%x "
-		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n",
+		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld/%ld\n",
 		   rsp->completed, gpnum, rsp->fqs_state,
 		   (long)(rsp->jiffies_force_qs - jiffies),
 		   (int)(jiffies & 0xffff),
 		   rsp->n_force_qs, rsp->n_force_qs_ngp,
 		   rsp->n_force_qs - rsp->n_force_qs_ngp,
-		   rsp->n_force_qs_lh);
+		   rsp->n_force_qs_lh, rsp->qlen_lazy, rsp->qlen);
 	for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) {
 		if (rnp->level != level) {
 			seq_puts(m, "\n");