diff mbox

[tip/core/rcu,1/3] rcu: Remove _rcu_barrier() dependency on __stop_machine()

Message ID 1346353383-350-1-git-send-email-paulmck@linux.vnet.ibm.com
State Accepted
Commit 1331e7a1bbe1f11b19c4327ba0853bee2a606543
Headers show

Commit Message

Paul E. McKenney Aug. 30, 2012, 7:03 p.m. UTC
From: "Paul E. McKenney" <paul.mckenney@linaro.org>

Currently, _rcu_barrier() relies on preempt_disable() to prevent
any CPU from going offline, which in turn depends on CPU hotplug's
use of __stop_machine().

This patch therefore makes _rcu_barrier() use get_online_cpus() to
block CPU-hotplug operations.  This has the added benefit of removing
the need for _rcu_barrier() to adopt callbacks:  Because CPU-hotplug
operations are excluded, there can be no callbacks to adopt.  This
commit simplifies the code accordingly.

Signed-off-by: Paul E. McKenney <paul.mckenney@linaro.org>
Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
---
 kernel/rcutree.c       |   83 ++++++-----------------------------------------
 kernel/rcutree.h       |    3 --
 kernel/rcutree_trace.c |    4 +-
 3 files changed, 13 insertions(+), 77 deletions(-)

Comments

Josh Triplett Aug. 31, 2012, 4:09 p.m. UTC | #1
On Thu, Aug 30, 2012 at 12:03:01PM -0700, Paul E. McKenney wrote:
> From: "Paul E. McKenney" <paul.mckenney@linaro.org>
> 
> Currently, _rcu_barrier() relies on preempt_disable() to prevent
> any CPU from going offline, which in turn depends on CPU hotplug's
> use of __stop_machine().
> 
> This patch therefore makes _rcu_barrier() use get_online_cpus() to
> block CPU-hotplug operations.  This has the added benefit of removing
> the need for _rcu_barrier() to adopt callbacks:  Because CPU-hotplug
> operations are excluded, there can be no callbacks to adopt.  This
> commit simplifies the code accordingly.
> 
> Signed-off-by: Paul E. McKenney <paul.mckenney@linaro.org>
> Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>

Impressive simplification!

Reviewed-by: Josh Triplett <josh@joshtriplett.org>

> ---
>  kernel/rcutree.c       |   83 ++++++-----------------------------------------
>  kernel/rcutree.h       |    3 --
>  kernel/rcutree_trace.c |    4 +-
>  3 files changed, 13 insertions(+), 77 deletions(-)
> 
> diff --git a/kernel/rcutree.c b/kernel/rcutree.c
> index f280e54..9854a00 100644
> --- a/kernel/rcutree.c
> +++ b/kernel/rcutree.c
> @@ -1390,17 +1390,6 @@ static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
>  	int i;
>  	struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
>  
> -	/*
> -	 * If there is an rcu_barrier() operation in progress, then
> -	 * only the task doing that operation is permitted to adopt
> -	 * callbacks.  To do otherwise breaks rcu_barrier() and friends
> -	 * by causing them to fail to wait for the callbacks in the
> -	 * orphanage.
> -	 */
> -	if (rsp->rcu_barrier_in_progress &&
> -	    rsp->rcu_barrier_in_progress != current)
> -		return;
> -
>  	/* Do the accounting first. */
>  	rdp->qlen_lazy += rsp->qlen_lazy;
>  	rdp->qlen += rsp->qlen;
> @@ -1455,9 +1444,8 @@ static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
>   * The CPU has been completely removed, and some other CPU is reporting
>   * this fact from process context.  Do the remainder of the cleanup,
>   * including orphaning the outgoing CPU's RCU callbacks, and also
> - * adopting them, if there is no _rcu_barrier() instance running.
> - * There can only be one CPU hotplug operation at a time, so no other
> - * CPU can be attempting to update rcu_cpu_kthread_task.
> + * adopting them.  There can only be one CPU hotplug operation at a time,
> + * so no other CPU can be attempting to update rcu_cpu_kthread_task.
>   */
>  static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
>  {
> @@ -1519,10 +1507,6 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
>  
>  #else /* #ifdef CONFIG_HOTPLUG_CPU */
>  
> -static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> -{
> -}
> -
>  static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
>  {
>  }
> @@ -2326,13 +2310,10 @@ static void rcu_barrier_func(void *type)
>  static void _rcu_barrier(struct rcu_state *rsp)
>  {
>  	int cpu;
> -	unsigned long flags;
>  	struct rcu_data *rdp;
> -	struct rcu_data rd;
>  	unsigned long snap = ACCESS_ONCE(rsp->n_barrier_done);
>  	unsigned long snap_done;
>  
> -	init_rcu_head_on_stack(&rd.barrier_head);
>  	_rcu_barrier_trace(rsp, "Begin", -1, snap);
>  
>  	/* Take mutex to serialize concurrent rcu_barrier() requests. */
> @@ -2372,70 +2353,30 @@ static void _rcu_barrier(struct rcu_state *rsp)
>  	/*
>  	 * Initialize the count to one rather than to zero in order to
>  	 * avoid a too-soon return to zero in case of a short grace period
> -	 * (or preemption of this task).  Also flag this task as doing
> -	 * an rcu_barrier().  This will prevent anyone else from adopting
> -	 * orphaned callbacks, which could cause otherwise failure if a
> -	 * CPU went offline and quickly came back online.  To see this,
> -	 * consider the following sequence of events:
> -	 *
> -	 * 1.	We cause CPU 0 to post an rcu_barrier_callback() callback.
> -	 * 2.	CPU 1 goes offline, orphaning its callbacks.
> -	 * 3.	CPU 0 adopts CPU 1's orphaned callbacks.
> -	 * 4.	CPU 1 comes back online.
> -	 * 5.	We cause CPU 1 to post an rcu_barrier_callback() callback.
> -	 * 6.	Both rcu_barrier_callback() callbacks are invoked, awakening
> -	 *	us -- but before CPU 1's orphaned callbacks are invoked!!!
> +	 * (or preemption of this task).  Exclude CPU-hotplug operations
> +	 * to ensure that no offline CPU has callbacks queued.
>  	 */
>  	init_completion(&rsp->barrier_completion);
>  	atomic_set(&rsp->barrier_cpu_count, 1);
> -	raw_spin_lock_irqsave(&rsp->onofflock, flags);
> -	rsp->rcu_barrier_in_progress = current;
> -	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> +	get_online_cpus();
>  
>  	/*
> -	 * Force every CPU with callbacks to register a new callback
> -	 * that will tell us when all the preceding callbacks have
> -	 * been invoked.  If an offline CPU has callbacks, wait for
> -	 * it to either come back online or to finish orphaning those
> -	 * callbacks.
> +	 * Force each CPU with callbacks to register a new callback.
> +	 * When that callback is invoked, we will know that all of the
> +	 * corresponding CPU's preceding callbacks have been invoked.
>  	 */
> -	for_each_possible_cpu(cpu) {
> -		preempt_disable();
> +	for_each_online_cpu(cpu) {
>  		rdp = per_cpu_ptr(rsp->rda, cpu);
> -		if (cpu_is_offline(cpu)) {
> -			_rcu_barrier_trace(rsp, "Offline", cpu,
> -					   rsp->n_barrier_done);
> -			preempt_enable();
> -			while (cpu_is_offline(cpu) && ACCESS_ONCE(rdp->qlen))
> -				schedule_timeout_interruptible(1);
> -		} else if (ACCESS_ONCE(rdp->qlen)) {
> +		if (ACCESS_ONCE(rdp->qlen)) {
>  			_rcu_barrier_trace(rsp, "OnlineQ", cpu,
>  					   rsp->n_barrier_done);
>  			smp_call_function_single(cpu, rcu_barrier_func, rsp, 1);
> -			preempt_enable();
>  		} else {
>  			_rcu_barrier_trace(rsp, "OnlineNQ", cpu,
>  					   rsp->n_barrier_done);
> -			preempt_enable();
>  		}
>  	}
> -
> -	/*
> -	 * Now that all online CPUs have rcu_barrier_callback() callbacks
> -	 * posted, we can adopt all of the orphaned callbacks and place
> -	 * an rcu_barrier_callback() callback after them.  When that is done,
> -	 * we are guaranteed to have an rcu_barrier_callback() callback
> -	 * following every callback that could possibly have been
> -	 * registered before _rcu_barrier() was called.
> -	 */
> -	raw_spin_lock_irqsave(&rsp->onofflock, flags);
> -	rcu_adopt_orphan_cbs(rsp);
> -	rsp->rcu_barrier_in_progress = NULL;
> -	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> -	atomic_inc(&rsp->barrier_cpu_count);
> -	smp_mb__after_atomic_inc(); /* Ensure atomic_inc() before callback. */
> -	rd.rsp = rsp;
> -	rsp->call(&rd.barrier_head, rcu_barrier_callback);
> +	put_online_cpus();
>  
>  	/*
>  	 * Now that we have an rcu_barrier_callback() callback on each
> @@ -2456,8 +2397,6 @@ static void _rcu_barrier(struct rcu_state *rsp)
>  
>  	/* Other rcu_barrier() invocations can now safely proceed. */
>  	mutex_unlock(&rsp->barrier_mutex);
> -
> -	destroy_rcu_head_on_stack(&rd.barrier_head);
>  }
>  
>  /**
> diff --git a/kernel/rcutree.h b/kernel/rcutree.h
> index 4d29169..94dfdf1 100644
> --- a/kernel/rcutree.h
> +++ b/kernel/rcutree.h
> @@ -398,9 +398,6 @@ struct rcu_state {
>  	struct rcu_head **orphan_donetail;	/* Tail of above. */
>  	long qlen_lazy;				/* Number of lazy callbacks. */
>  	long qlen;				/* Total number of callbacks. */
> -	struct task_struct *rcu_barrier_in_progress;
> -						/* Task doing rcu_barrier(), */
> -						/*  or NULL if no barrier. */
>  	struct mutex barrier_mutex;		/* Guards barrier fields. */
>  	atomic_t barrier_cpu_count;		/* # CPUs waiting on. */
>  	struct completion barrier_completion;	/* Wake at barrier end. */
> diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
> index abffb48..6a2e52a 100644
> --- a/kernel/rcutree_trace.c
> +++ b/kernel/rcutree_trace.c
> @@ -51,8 +51,8 @@ static int show_rcubarrier(struct seq_file *m, void *unused)
>  	struct rcu_state *rsp;
>  
>  	for_each_rcu_flavor(rsp)
> -		seq_printf(m, "%s: %c bcc: %d nbd: %lu\n",
> -			   rsp->name, rsp->rcu_barrier_in_progress ? 'B' : '.',
> +		seq_printf(m, "%s: bcc: %d nbd: %lu\n",
> +			   rsp->name,
>  			   atomic_read(&rsp->barrier_cpu_count),
>  			   rsp->n_barrier_done);
>  	return 0;
> -- 
> 1.7.8
>
diff mbox

Patch

diff --git a/kernel/rcutree.c b/kernel/rcutree.c
index f280e54..9854a00 100644
--- a/kernel/rcutree.c
+++ b/kernel/rcutree.c
@@ -1390,17 +1390,6 @@  static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
 	int i;
 	struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
 
-	/*
-	 * If there is an rcu_barrier() operation in progress, then
-	 * only the task doing that operation is permitted to adopt
-	 * callbacks.  To do otherwise breaks rcu_barrier() and friends
-	 * by causing them to fail to wait for the callbacks in the
-	 * orphanage.
-	 */
-	if (rsp->rcu_barrier_in_progress &&
-	    rsp->rcu_barrier_in_progress != current)
-		return;
-
 	/* Do the accounting first. */
 	rdp->qlen_lazy += rsp->qlen_lazy;
 	rdp->qlen += rsp->qlen;
@@ -1455,9 +1444,8 @@  static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
  * The CPU has been completely removed, and some other CPU is reporting
  * this fact from process context.  Do the remainder of the cleanup,
  * including orphaning the outgoing CPU's RCU callbacks, and also
- * adopting them, if there is no _rcu_barrier() instance running.
- * There can only be one CPU hotplug operation at a time, so no other
- * CPU can be attempting to update rcu_cpu_kthread_task.
+ * adopting them.  There can only be one CPU hotplug operation at a time,
+ * so no other CPU can be attempting to update rcu_cpu_kthread_task.
  */
 static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
 {
@@ -1519,10 +1507,6 @@  static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
 
 #else /* #ifdef CONFIG_HOTPLUG_CPU */
 
-static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
-{
-}
-
 static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
 {
 }
@@ -2326,13 +2310,10 @@  static void rcu_barrier_func(void *type)
 static void _rcu_barrier(struct rcu_state *rsp)
 {
 	int cpu;
-	unsigned long flags;
 	struct rcu_data *rdp;
-	struct rcu_data rd;
 	unsigned long snap = ACCESS_ONCE(rsp->n_barrier_done);
 	unsigned long snap_done;
 
-	init_rcu_head_on_stack(&rd.barrier_head);
 	_rcu_barrier_trace(rsp, "Begin", -1, snap);
 
 	/* Take mutex to serialize concurrent rcu_barrier() requests. */
@@ -2372,70 +2353,30 @@  static void _rcu_barrier(struct rcu_state *rsp)
 	/*
 	 * Initialize the count to one rather than to zero in order to
 	 * avoid a too-soon return to zero in case of a short grace period
-	 * (or preemption of this task).  Also flag this task as doing
-	 * an rcu_barrier().  This will prevent anyone else from adopting
-	 * orphaned callbacks, which could cause otherwise failure if a
-	 * CPU went offline and quickly came back online.  To see this,
-	 * consider the following sequence of events:
-	 *
-	 * 1.	We cause CPU 0 to post an rcu_barrier_callback() callback.
-	 * 2.	CPU 1 goes offline, orphaning its callbacks.
-	 * 3.	CPU 0 adopts CPU 1's orphaned callbacks.
-	 * 4.	CPU 1 comes back online.
-	 * 5.	We cause CPU 1 to post an rcu_barrier_callback() callback.
-	 * 6.	Both rcu_barrier_callback() callbacks are invoked, awakening
-	 *	us -- but before CPU 1's orphaned callbacks are invoked!!!
+	 * (or preemption of this task).  Exclude CPU-hotplug operations
+	 * to ensure that no offline CPU has callbacks queued.
 	 */
 	init_completion(&rsp->barrier_completion);
 	atomic_set(&rsp->barrier_cpu_count, 1);
-	raw_spin_lock_irqsave(&rsp->onofflock, flags);
-	rsp->rcu_barrier_in_progress = current;
-	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
+	get_online_cpus();
 
 	/*
-	 * Force every CPU with callbacks to register a new callback
-	 * that will tell us when all the preceding callbacks have
-	 * been invoked.  If an offline CPU has callbacks, wait for
-	 * it to either come back online or to finish orphaning those
-	 * callbacks.
+	 * Force each CPU with callbacks to register a new callback.
+	 * When that callback is invoked, we will know that all of the
+	 * corresponding CPU's preceding callbacks have been invoked.
 	 */
-	for_each_possible_cpu(cpu) {
-		preempt_disable();
+	for_each_online_cpu(cpu) {
 		rdp = per_cpu_ptr(rsp->rda, cpu);
-		if (cpu_is_offline(cpu)) {
-			_rcu_barrier_trace(rsp, "Offline", cpu,
-					   rsp->n_barrier_done);
-			preempt_enable();
-			while (cpu_is_offline(cpu) && ACCESS_ONCE(rdp->qlen))
-				schedule_timeout_interruptible(1);
-		} else if (ACCESS_ONCE(rdp->qlen)) {
+		if (ACCESS_ONCE(rdp->qlen)) {
 			_rcu_barrier_trace(rsp, "OnlineQ", cpu,
 					   rsp->n_barrier_done);
 			smp_call_function_single(cpu, rcu_barrier_func, rsp, 1);
-			preempt_enable();
 		} else {
 			_rcu_barrier_trace(rsp, "OnlineNQ", cpu,
 					   rsp->n_barrier_done);
-			preempt_enable();
 		}
 	}
-
-	/*
-	 * Now that all online CPUs have rcu_barrier_callback() callbacks
-	 * posted, we can adopt all of the orphaned callbacks and place
-	 * an rcu_barrier_callback() callback after them.  When that is done,
-	 * we are guaranteed to have an rcu_barrier_callback() callback
-	 * following every callback that could possibly have been
-	 * registered before _rcu_barrier() was called.
-	 */
-	raw_spin_lock_irqsave(&rsp->onofflock, flags);
-	rcu_adopt_orphan_cbs(rsp);
-	rsp->rcu_barrier_in_progress = NULL;
-	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
-	atomic_inc(&rsp->barrier_cpu_count);
-	smp_mb__after_atomic_inc(); /* Ensure atomic_inc() before callback. */
-	rd.rsp = rsp;
-	rsp->call(&rd.barrier_head, rcu_barrier_callback);
+	put_online_cpus();
 
 	/*
 	 * Now that we have an rcu_barrier_callback() callback on each
@@ -2456,8 +2397,6 @@  static void _rcu_barrier(struct rcu_state *rsp)
 
 	/* Other rcu_barrier() invocations can now safely proceed. */
 	mutex_unlock(&rsp->barrier_mutex);
-
-	destroy_rcu_head_on_stack(&rd.barrier_head);
 }
 
 /**
diff --git a/kernel/rcutree.h b/kernel/rcutree.h
index 4d29169..94dfdf1 100644
--- a/kernel/rcutree.h
+++ b/kernel/rcutree.h
@@ -398,9 +398,6 @@  struct rcu_state {
 	struct rcu_head **orphan_donetail;	/* Tail of above. */
 	long qlen_lazy;				/* Number of lazy callbacks. */
 	long qlen;				/* Total number of callbacks. */
-	struct task_struct *rcu_barrier_in_progress;
-						/* Task doing rcu_barrier(), */
-						/*  or NULL if no barrier. */
 	struct mutex barrier_mutex;		/* Guards barrier fields. */
 	atomic_t barrier_cpu_count;		/* # CPUs waiting on. */
 	struct completion barrier_completion;	/* Wake at barrier end. */
diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
index abffb48..6a2e52a 100644
--- a/kernel/rcutree_trace.c
+++ b/kernel/rcutree_trace.c
@@ -51,8 +51,8 @@  static int show_rcubarrier(struct seq_file *m, void *unused)
 	struct rcu_state *rsp;
 
 	for_each_rcu_flavor(rsp)
-		seq_printf(m, "%s: %c bcc: %d nbd: %lu\n",
-			   rsp->name, rsp->rcu_barrier_in_progress ? 'B' : '.',
+		seq_printf(m, "%s: bcc: %d nbd: %lu\n",
+			   rsp->name,
 			   atomic_read(&rsp->barrier_cpu_count),
 			   rsp->n_barrier_done);
 	return 0;