diff mbox

[tip/core/rcu,1/2] rcu: Add callback-free CPUs

Message ID 1351655191-2648-1-git-send-email-paulmck@linux.vnet.ibm.com
State New
Headers show

Commit Message

Paul E. McKenney Oct. 31, 2012, 3:46 a.m. UTC
From: "Paul E. McKenney" <paul.mckenney@linaro.org>

RCU callback execution can add significant OS jitter and also can degrade
scheduling latency.  This commit therefore adds the ability for selected
CPUs ("rcu_nocbs=" boot parameter) to have their callbacks offloaded to
kthreads.  If the "rcu_nocb_poll" boot parameter is also specified, these
kthreads will do polling, removing the need for the offloaded CPUs to do
wakeups.  At least one CPU must be doing normal callback processing:
currently CPU 0 cannot be selected as a no-CBs CPU.  In addition, attempts
to offline the last normal-CBs CPU will fail.

This feature was inspired by Jim Houston's and Joe Korty's JRCU, and
this commit includes fixes to problems located by Fengguang Wu's
kbuild test robot.

[ paulmck: Added gfp.h include file as suggested by Fengguang Wu. ]

Signed-off-by: Paul E. McKenney <paul.mckenney@linaro.org>
Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
---
 Documentation/kernel-parameters.txt |    5 +
 include/trace/events/rcu.h          |    1 +
 init/Kconfig                        |   19 ++
 kernel/rcutree.c                    |   63 +++++-
 kernel/rcutree.h                    |   47 ++++
 kernel/rcutree_plugin.h             |  397 ++++++++++++++++++++++++++++++++++-
 kernel/rcutree_trace.c              |    7 +-
 7 files changed, 523 insertions(+), 16 deletions(-)

Comments

Frederic Weisbecker Oct. 31, 2012, 2:10 p.m. UTC | #1
2012/10/31 Paul E. McKenney <paulmck@linux.vnet.ibm.com>:
> +/*
> + * Per-rcu_data kthread, but only for no-CBs CPUs.  Each kthread invokes
> + * callbacks queued by the corresponding no-CBs CPU.
> + */
> +static int rcu_nocb_kthread(void *arg)
> +{
> +       int c, cl;
> +       struct rcu_head *list;
> +       struct rcu_head *next;
> +       struct rcu_head **tail;
> +       struct rcu_data *rdp = arg;
> +
> +       /* Each pass through this loop invokes one batch of callbacks */
> +       for (;;) {
> +               /* If not polling, wait for next batch of callbacks. */
> +               if (!rcu_nocb_poll)
> +                       wait_event(rdp->nocb_wq, rdp->nocb_head);
> +               list = ACCESS_ONCE(rdp->nocb_head);
> +               if (!list) {
> +                       schedule_timeout_interruptible(1);
> +                       continue;
> +               }
> +
> +               /*
> +                * Extract queued callbacks, update counts, and wait
> +                * for a grace period to elapse.
> +                */
> +               ACCESS_ONCE(rdp->nocb_head) = NULL;
> +               tail = xchg(&rdp->nocb_tail, &rdp->nocb_head);
> +               c = atomic_long_xchg(&rdp->nocb_q_count, 0);
> +               cl = atomic_long_xchg(&rdp->nocb_q_count_lazy, 0);
> +               ACCESS_ONCE(rdp->nocb_p_count) += c;
> +               ACCESS_ONCE(rdp->nocb_p_count_lazy) += cl;
> +               wait_rcu_gp(rdp->rsp->call_remote);
> +
> +               /* Each pass through the following loop invokes a callback. */
> +               trace_rcu_batch_start(rdp->rsp->name, cl, c, -1);
> +               c = cl = 0;
> +               while (list) {
> +                       next = list->next;
> +                       /* Wait for enqueuing to complete, if needed. */
> +                       while (next == NULL && &list->next != tail) {
> +                               schedule_timeout_interruptible(1);
> +                               next = list->next;
> +                       }
> +                       debug_rcu_head_unqueue(list);
> +                       local_bh_disable();
> +                       if (__rcu_reclaim(rdp->rsp->name, list))
> +                               cl++;
> +                       c++;
> +                       local_bh_enable();
> +                       list = next;
> +               }
> +               trace_rcu_batch_end(rdp->rsp->name, c, !!list, 0, 0, 1);
> +               ACCESS_ONCE(rdp->nocb_p_count) -= c;
> +               ACCESS_ONCE(rdp->nocb_p_count_lazy) -= cl;
> +               rdp->n_cbs_invoked += c;
> +       }
> +       return 0;
> +}
> +
> +/* Initialize per-rcu_data variables for no-CBs CPUs. */
> +static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
> +{
> +       rdp->nocb_tail = &rdp->nocb_head;
> +       init_waitqueue_head(&rdp->nocb_wq);
> +}
> +
> +/* Create a kthread for each RCU flavor for each no-CBs CPU. */
> +static void __init rcu_spawn_nocb_kthreads(struct rcu_state *rsp)
> +{
> +       int cpu;
> +       struct rcu_data *rdp;
> +       struct task_struct *t;
> +
> +       if (rcu_nocb_mask == NULL)
> +               return;
> +       for_each_cpu(cpu, rcu_nocb_mask) {
> +               rdp = per_cpu_ptr(rsp->rda, cpu);
> +               t = kthread_run(rcu_nocb_kthread, rdp, "rcuo%d", cpu);

Sorry, I think I left my brain in the middle of the diff. But there is
something I'm misunderstanding I think. Here you're creating an
rcu_nocb_kthread per nocb cpu. Looking at the code of
rcu_nocb_kthread(), it seems to execute the callbacks with
__rcu_reclaim().

So, in the end, no callbacks CPU execute their callbacks. Isn't it the
opposite than what is expected? (again, just referring to my
misunderstanding).

Thanks.

> +               BUG_ON(IS_ERR(t));
> +               ACCESS_ONCE(rdp->nocb_kthread) = t;
> +       }
> +}
Paul E. McKenney Nov. 2, 2012, 6:41 p.m. UTC | #2
On Wed, Oct 31, 2012 at 03:10:04PM +0100, Frederic Weisbecker wrote:
> 2012/10/31 Paul E. McKenney <paulmck@linux.vnet.ibm.com>:
> > +/*
> > + * Per-rcu_data kthread, but only for no-CBs CPUs.  Each kthread invokes
> > + * callbacks queued by the corresponding no-CBs CPU.
> > + */
> > +static int rcu_nocb_kthread(void *arg)
> > +{
> > +       int c, cl;
> > +       struct rcu_head *list;
> > +       struct rcu_head *next;
> > +       struct rcu_head **tail;
> > +       struct rcu_data *rdp = arg;
> > +
> > +       /* Each pass through this loop invokes one batch of callbacks */
> > +       for (;;) {
> > +               /* If not polling, wait for next batch of callbacks. */
> > +               if (!rcu_nocb_poll)
> > +                       wait_event(rdp->nocb_wq, rdp->nocb_head);
> > +               list = ACCESS_ONCE(rdp->nocb_head);
> > +               if (!list) {
> > +                       schedule_timeout_interruptible(1);
> > +                       continue;
> > +               }
> > +
> > +               /*
> > +                * Extract queued callbacks, update counts, and wait
> > +                * for a grace period to elapse.
> > +                */
> > +               ACCESS_ONCE(rdp->nocb_head) = NULL;
> > +               tail = xchg(&rdp->nocb_tail, &rdp->nocb_head);
> > +               c = atomic_long_xchg(&rdp->nocb_q_count, 0);
> > +               cl = atomic_long_xchg(&rdp->nocb_q_count_lazy, 0);
> > +               ACCESS_ONCE(rdp->nocb_p_count) += c;
> > +               ACCESS_ONCE(rdp->nocb_p_count_lazy) += cl;
> > +               wait_rcu_gp(rdp->rsp->call_remote);
> > +
> > +               /* Each pass through the following loop invokes a callback. */
> > +               trace_rcu_batch_start(rdp->rsp->name, cl, c, -1);
> > +               c = cl = 0;
> > +               while (list) {
> > +                       next = list->next;
> > +                       /* Wait for enqueuing to complete, if needed. */
> > +                       while (next == NULL && &list->next != tail) {
> > +                               schedule_timeout_interruptible(1);
> > +                               next = list->next;
> > +                       }
> > +                       debug_rcu_head_unqueue(list);
> > +                       local_bh_disable();
> > +                       if (__rcu_reclaim(rdp->rsp->name, list))
> > +                               cl++;
> > +                       c++;
> > +                       local_bh_enable();
> > +                       list = next;
> > +               }
> > +               trace_rcu_batch_end(rdp->rsp->name, c, !!list, 0, 0, 1);
> > +               ACCESS_ONCE(rdp->nocb_p_count) -= c;
> > +               ACCESS_ONCE(rdp->nocb_p_count_lazy) -= cl;
> > +               rdp->n_cbs_invoked += c;
> > +       }
> > +       return 0;
> > +}
> > +
> > +/* Initialize per-rcu_data variables for no-CBs CPUs. */
> > +static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
> > +{
> > +       rdp->nocb_tail = &rdp->nocb_head;
> > +       init_waitqueue_head(&rdp->nocb_wq);
> > +}
> > +
> > +/* Create a kthread for each RCU flavor for each no-CBs CPU. */
> > +static void __init rcu_spawn_nocb_kthreads(struct rcu_state *rsp)
> > +{
> > +       int cpu;
> > +       struct rcu_data *rdp;
> > +       struct task_struct *t;
> > +
> > +       if (rcu_nocb_mask == NULL)
> > +               return;
> > +       for_each_cpu(cpu, rcu_nocb_mask) {
> > +               rdp = per_cpu_ptr(rsp->rda, cpu);
> > +               t = kthread_run(rcu_nocb_kthread, rdp, "rcuo%d", cpu);
> 
> Sorry, I think I left my brain in the middle of the diff. But there is
> something I'm misunderstanding I think. Here you're creating an
> rcu_nocb_kthread per nocb cpu. Looking at the code of
> rcu_nocb_kthread(), it seems to execute the callbacks with
> __rcu_reclaim().

True, executing within the context of the kthread, which might be
executing on any CPU.

> So, in the end, no callbacks CPU execute their callbacks. Isn't it the
> opposite than what is expected? (again, just referring to my
> misunderstanding).

The no-callbacks CPU would execute its own callbacks only if the
corresponding kthread happened to be executing on that CPU.
If you wanted a given CPU to be completely free of callbacks,
you would instead constrain the corresponding kthread to run
elsewhere.

							Thanx, Paul

> Thanks.
> 
> > +               BUG_ON(IS_ERR(t));
> > +               ACCESS_ONCE(rdp->nocb_kthread) = t;
> > +       }
> > +}
>
Frederic Weisbecker Nov. 2, 2012, 9:21 p.m. UTC | #3
2012/11/2 Paul E. McKenney <paulmck@linux.vnet.ibm.com>:
> The no-callbacks CPU would execute its own callbacks only if the
> corresponding kthread happened to be executing on that CPU.
> If you wanted a given CPU to be completely free of callbacks,
> you would instead constrain the corresponding kthread to run
> elsewhere.

Ah ok, makes sense.

Thanks.
diff mbox

Patch

diff --git a/Documentation/kernel-parameters.txt b/Documentation/kernel-parameters.txt
index 9776f06..dfd03272 100644
--- a/Documentation/kernel-parameters.txt
+++ b/Documentation/kernel-parameters.txt
@@ -2394,6 +2394,11 @@  bytes respectively. Such letter suffixes can also be entirely omitted.
 	ramdisk_size=	[RAM] Sizes of RAM disks in kilobytes
 			See Documentation/blockdev/ramdisk.txt.
 
+	rcu_nocbs=	[KNL,BOOT]
+			Set the specified list of CPUs to be no-callback
+			CPUs.  Invocation of these CPUs' RCU callbacks will
+			be offloaded to kthreads created for that purpose.
+
 	rcutree.blimit=	[KNL,BOOT]
 			Set maximum number of finished RCU callbacks to process
 			in one batch.
diff --git a/include/trace/events/rcu.h b/include/trace/events/rcu.h
index 5bde94d..d4f559b 100644
--- a/include/trace/events/rcu.h
+++ b/include/trace/events/rcu.h
@@ -549,6 +549,7 @@  TRACE_EVENT(rcu_torture_read,
  *	"EarlyExit": rcu_barrier_callback() piggybacked, thus early exit.
  *	"Inc1": rcu_barrier_callback() piggyback check counter incremented.
  *	"Offline": rcu_barrier_callback() found offline CPU
+ *	"OnlineNoCB": rcu_barrier_callback() found online no-CBs CPU.
  *	"OnlineQ": rcu_barrier_callback() found online CPU with callbacks.
  *	"OnlineNQ": rcu_barrier_callback() found online CPU, no callbacks.
  *	"IRQ": An rcu_barrier_callback() callback posted on remote CPU.
diff --git a/init/Kconfig b/init/Kconfig
index ec62139..e2343d4 100644
--- a/init/Kconfig
+++ b/init/Kconfig
@@ -654,6 +654,25 @@  config RCU_BOOST_DELAY
 
 	  Accept the default if unsure.
 
+config RCU_NOCB_CPU
+	bool "Offload RCU callback processing from boot-selected CPUs"
+	depends on TREE_RCU || TREE_PREEMPT_RCU
+	default n
+	help
+	  Use this option to reduce OS jitter for aggressive HPC or
+	  real-time workloads.
+
+	  This option offloads callback invocation from the set of CPUs
+	  specified at boot time by the rcu_nocbs parameter.  For each
+	  such CPU, a kthread ("rcuoN") will be created to invoke callbacks.
+	  Nothing prevents this kthread from running on of of the specified
+	  CPUs, but (1) the kthreads may be preempted between each callback
+	  and (2) affinity or cgroups can be used to force the kthreads off
+	  of those CPUs if desired.
+
+	  Say Y here if you want reduced OS jitter on selected CPUs.
+	  Say N here if you are unsure.
+
 endmenu # "RCU Subsystem"
 
 config IKCONFIG
diff --git a/kernel/rcutree.c b/kernel/rcutree.c
index 9ce19c9..1523c47 100644
--- a/kernel/rcutree.c
+++ b/kernel/rcutree.c
@@ -297,7 +297,8 @@  EXPORT_SYMBOL_GPL(rcu_sched_force_quiescent_state);
 static int
 cpu_has_callbacks_ready_to_invoke(struct rcu_data *rdp)
 {
-	return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL];
+	return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL] &&
+	       rdp->nxttail[RCU_DONE_TAIL] != NULL;
 }
 
 /*
@@ -306,8 +307,11 @@  cpu_has_callbacks_ready_to_invoke(struct rcu_data *rdp)
 static int
 cpu_needs_another_gp(struct rcu_state *rsp, struct rcu_data *rdp)
 {
-	return *rdp->nxttail[RCU_DONE_TAIL +
-			     (ACCESS_ONCE(rsp->completed) != rdp->completed)] &&
+	struct rcu_head **ntp;
+
+	ntp = rdp->nxttail[RCU_DONE_TAIL +
+			   (ACCESS_ONCE(rsp->completed) != rdp->completed)];
+	return rdp->nxttail[RCU_DONE_TAIL] && ntp && *ntp &&
 	       !rcu_gp_in_progress(rsp);
 }
 
@@ -1084,6 +1088,7 @@  static void init_callback_list(struct rcu_data *rdp)
 	rdp->nxtlist = NULL;
 	for (i = 0; i < RCU_NEXT_SIZE; i++)
 		rdp->nxttail[i] = &rdp->nxtlist;
+	init_nocb_callback_list(rdp);
 }
 
 /*
@@ -1594,6 +1599,10 @@  static void
 rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
 			  struct rcu_node *rnp, struct rcu_data *rdp)
 {
+	/* No-CBs CPUs do not have orphanable callbacks. */
+	if (is_nocb_cpu(rdp->cpu))
+		return;
+
 	/*
 	 * Orphan the callbacks.  First adjust the counts.  This is safe
 	 * because _rcu_barrier() excludes CPU-hotplug operations, so it
@@ -1645,6 +1654,10 @@  static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
 	int i;
 	struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
 
+	/* No-CBs CPUs are handled specially. */
+	if (rcu_nocb_adopt_orphan_cbs(rsp, rdp))
+		return;
+
 	/* Do the accounting first. */
 	rdp->qlen_lazy += rsp->qlen_lazy;
 	rdp->qlen += rsp->qlen;
@@ -2123,9 +2136,15 @@  static void __call_rcu_core(struct rcu_state *rsp, struct rcu_data *rdp,
 	}
 }
 
+/*
+ * Helper function for call_rcu() and friends.  The cpu argument will
+ * normally be -1, indicating "currently running CPU".  It may specify
+ * a CPU only if that CPU is a no-CBs CPU.  Currently, only _rcu_barrier()
+ * is expected to specify a CPU.
+ */
 static void
 __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
-	   struct rcu_state *rsp, bool lazy)
+	   struct rcu_state *rsp, int cpu, bool lazy)
 {
 	unsigned long flags;
 	struct rcu_data *rdp;
@@ -2145,9 +2164,14 @@  __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
 	rdp = this_cpu_ptr(rsp->rda);
 
 	/* Add the callback to our list. */
-	if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL)) {
+	if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL) || cpu != -1) {
+		int offline;
+
+		if (cpu != -1)
+			rdp = per_cpu_ptr(rsp->rda, cpu);
+		offline = !__call_rcu_nocb(rdp, head, lazy);
+		WARN_ON_ONCE(offline);
 		/* _call_rcu() is illegal on offline CPU; leak the callback. */
-		WARN_ON_ONCE(1);
 		local_irq_restore(flags);
 		return;
 	}
@@ -2176,7 +2200,7 @@  __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
  */
 void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
 {
-	__call_rcu(head, func, &rcu_sched_state, 0);
+	__call_rcu(head, func, &rcu_sched_state, -1, 0);
 }
 EXPORT_SYMBOL_GPL(call_rcu_sched);
 
@@ -2185,7 +2209,7 @@  EXPORT_SYMBOL_GPL(call_rcu_sched);
  */
 void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
 {
-	__call_rcu(head, func, &rcu_bh_state, 0);
+	__call_rcu(head, func, &rcu_bh_state, -1, 0);
 }
 EXPORT_SYMBOL_GPL(call_rcu_bh);
 
@@ -2632,9 +2656,17 @@  static void _rcu_barrier(struct rcu_state *rsp)
 	 * When that callback is invoked, we will know that all of the
 	 * corresponding CPU's preceding callbacks have been invoked.
 	 */
-	for_each_online_cpu(cpu) {
+	for_each_possible_cpu(cpu) {
+		if (!cpu_online(cpu) && !is_nocb_cpu(cpu))
+			continue;
 		rdp = per_cpu_ptr(rsp->rda, cpu);
-		if (ACCESS_ONCE(rdp->qlen)) {
+		if (is_nocb_cpu(cpu)) {
+			_rcu_barrier_trace(rsp, "OnlineNoCB", cpu,
+					   rsp->n_barrier_done);
+			atomic_inc(&rsp->barrier_cpu_count);
+			__call_rcu(&rdp->barrier_head, rcu_barrier_callback,
+				   rsp, cpu, 0);
+		} else if (ACCESS_ONCE(rdp->qlen)) {
 			_rcu_barrier_trace(rsp, "OnlineQ", cpu,
 					   rsp->n_barrier_done);
 			smp_call_function_single(cpu, rcu_barrier_func, rsp, 1);
@@ -2708,6 +2740,7 @@  rcu_boot_init_percpu_data(int cpu, struct rcu_state *rsp)
 #endif
 	rdp->cpu = cpu;
 	rdp->rsp = rsp;
+	rcu_boot_init_nocb_percpu_data(rdp);
 	raw_spin_unlock_irqrestore(&rnp->lock, flags);
 }
 
@@ -2789,6 +2822,7 @@  static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
 	struct rcu_data *rdp = per_cpu_ptr(rcu_state->rda, cpu);
 	struct rcu_node *rnp = rdp->mynode;
 	struct rcu_state *rsp;
+	int ret = NOTIFY_OK;
 
 	trace_rcu_utilization("Start CPU hotplug");
 	switch (action) {
@@ -2802,7 +2836,10 @@  static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
 		rcu_boost_kthread_setaffinity(rnp, -1);
 		break;
 	case CPU_DOWN_PREPARE:
-		rcu_boost_kthread_setaffinity(rnp, cpu);
+		if (nocb_cpu_expendable(cpu))
+			rcu_boost_kthread_setaffinity(rnp, cpu);
+		else
+			ret = NOTIFY_BAD;
 		break;
 	case CPU_DYING:
 	case CPU_DYING_FROZEN:
@@ -2826,7 +2863,7 @@  static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
 		break;
 	}
 	trace_rcu_utilization("End CPU hotplug");
-	return NOTIFY_OK;
+	return ret;
 }
 
 /*
@@ -2846,6 +2883,7 @@  static int __init rcu_spawn_gp_kthread(void)
 		raw_spin_lock_irqsave(&rnp->lock, flags);
 		rsp->gp_kthread = t;
 		raw_spin_unlock_irqrestore(&rnp->lock, flags);
+		rcu_spawn_nocb_kthreads(rsp);
 	}
 	return 0;
 }
@@ -3041,6 +3079,7 @@  void __init rcu_init(void)
 	rcu_init_one(&rcu_sched_state, &rcu_sched_data);
 	rcu_init_one(&rcu_bh_state, &rcu_bh_data);
 	__rcu_init_preempt();
+	rcu_init_nocb();
 	 open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
 
 	/*
diff --git a/kernel/rcutree.h b/kernel/rcutree.h
index ce938b3..79954bb 100644
--- a/kernel/rcutree.h
+++ b/kernel/rcutree.h
@@ -317,6 +317,18 @@  struct rcu_data {
 	struct rcu_head oom_head;
 #endif /* #ifdef CONFIG_RCU_FAST_NO_HZ */
 
+	/* 7) Callback offloading. */
+#ifdef CONFIG_RCU_NOCB_CPU
+	struct rcu_head *nocb_head;	/* CBs waiting for kthread. */
+	struct rcu_head **nocb_tail;
+	atomic_long_t nocb_q_count;	/* # CBs waiting for kthread */
+	atomic_long_t nocb_q_count_lazy; /*  (approximate). */
+	int nocb_p_count;		/* # CBs being invoked by kthread */
+	int nocb_p_count_lazy;		/*  (approximate). */
+	wait_queue_head_t nocb_wq;	/* For nocb kthreads to sleep on. */
+	struct task_struct *nocb_kthread;
+#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
+
 	int cpu;
 	struct rcu_state *rsp;
 };
@@ -364,6 +376,12 @@  struct rcu_state {
 	struct rcu_data __percpu *rda;		/* pointer of percu rcu_data. */
 	void (*call)(struct rcu_head *head,	/* call_rcu() flavor. */
 		     void (*func)(struct rcu_head *head));
+#ifdef CONFIG_RCU_NOCB_CPU
+	void (*call_remote)(struct rcu_head *head,
+		     void (*func)(struct rcu_head *head));
+						/* call_rcu() flavor, but for */
+						/*  placing on remote CPU. */
+#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
 
 	/* The following fields are guarded by the root rcu_node's lock. */
 
@@ -434,6 +452,8 @@  struct rcu_state {
 #define RCU_GP_FLAG_FQS  0x2	/* Need grace-period quiescent-state forcing. */
 
 extern struct list_head rcu_struct_flavors;
+
+/* Sequence through rcu_state structures for each RCU flavor. */
 #define for_each_rcu_flavor(rsp) \
 	list_for_each_entry((rsp), &rcu_struct_flavors, flavors)
 
@@ -510,5 +530,32 @@  static void print_cpu_stall_info(struct rcu_state *rsp, int cpu);
 static void print_cpu_stall_info_end(void);
 static void zero_cpu_stall_ticks(struct rcu_data *rdp);
 static void increment_cpu_stall_ticks(void);
+static bool is_nocb_cpu(int cpu);
+static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
+			    bool lazy);
+static bool rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
+				      struct rcu_data *rdp);
+static bool nocb_cpu_expendable(int cpu);
+static void rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp);
+static void rcu_spawn_nocb_kthreads(struct rcu_state *rsp);
+static void init_nocb_callback_list(struct rcu_data *rdp);
+static void __init rcu_init_nocb(void);
 
 #endif /* #ifndef RCU_TREE_NONCORE */
+
+#ifdef CONFIG_RCU_TRACE
+#ifdef CONFIG_RCU_NOCB_CPU
+/* Sum up queue lengths for tracing. */
+static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll)
+{
+	*ql = atomic_long_read(&rdp->nocb_q_count) + rdp->nocb_p_count;
+	*qll = atomic_long_read(&rdp->nocb_q_count_lazy) + rdp->nocb_p_count_lazy;
+}
+#else /* #ifdef CONFIG_RCU_NOCB_CPU */
+static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll)
+{
+	*ql = 0;
+	*qll = 0;
+}
+#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
+#endif /* #ifdef CONFIG_RCU_TRACE */
diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h
index 986f41d..ea960c4 100644
--- a/kernel/rcutree_plugin.h
+++ b/kernel/rcutree_plugin.h
@@ -25,6 +25,7 @@ 
  */
 
 #include <linux/delay.h>
+#include <linux/gfp.h>
 #include <linux/oom.h>
 #include <linux/smpboot.h>
 
@@ -36,6 +37,14 @@ 
 #define RCU_BOOST_PRIO RCU_KTHREAD_PRIO
 #endif
 
+#ifdef CONFIG_RCU_NOCB_CPU
+static cpumask_var_t rcu_nocb_mask; /* CPUs to have callbacks offloaded. */
+static bool have_rcu_nocb_mask;	    /* Was rcu_nocb_mask allocated? */
+static bool rcu_nocb_poll;	    /* Offload kthread are to poll. */
+module_param(rcu_nocb_poll, bool, 0444);
+static char __initdata nocb_buf[NR_CPUS * 5];
+#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
+
 /*
  * Check the RCU kernel configuration parameters and print informative
  * messages about anything out of the ordinary.  If you like #ifdef, you
@@ -76,6 +85,18 @@  static void __init rcu_bootup_announce_oddness(void)
 		printk(KERN_INFO "\tExperimental boot-time adjustment of leaf fanout to %d.\n", rcu_fanout_leaf);
 	if (nr_cpu_ids != NR_CPUS)
 		printk(KERN_INFO "\tRCU restricting CPUs from NR_CPUS=%d to nr_cpu_ids=%d.\n", NR_CPUS, nr_cpu_ids);
+#ifdef CONFIG_RCU_NOCB_CPU
+	if (have_rcu_nocb_mask) {
+		if (cpumask_test_cpu(0, rcu_nocb_mask)) {
+			cpumask_clear_cpu(0, rcu_nocb_mask);
+			pr_info("\tCPU 0: illegal no-CBs CPU (cleared).\n");
+		}
+		cpulist_scnprintf(nocb_buf, sizeof(nocb_buf), rcu_nocb_mask);
+		pr_info("\tExperimental no-CBs CPUs: %s.\n", nocb_buf);
+		if (rcu_nocb_poll)
+			pr_info("\tExperimental polled no-CBs CPUs.\n");
+	}
+#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
 }
 
 #ifdef CONFIG_TREE_PREEMPT_RCU
@@ -642,7 +663,7 @@  static void rcu_preempt_do_callbacks(void)
  */
 void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
 {
-	__call_rcu(head, func, &rcu_preempt_state, 0);
+	__call_rcu(head, func, &rcu_preempt_state, -1, 0);
 }
 EXPORT_SYMBOL_GPL(call_rcu);
 
@@ -656,7 +677,7 @@  EXPORT_SYMBOL_GPL(call_rcu);
 void kfree_call_rcu(struct rcu_head *head,
 		    void (*func)(struct rcu_head *rcu))
 {
-	__call_rcu(head, func, &rcu_preempt_state, 1);
+	__call_rcu(head, func, &rcu_preempt_state, -1, 1);
 }
 EXPORT_SYMBOL_GPL(kfree_call_rcu);
 
@@ -1025,7 +1046,7 @@  static void rcu_preempt_check_callbacks(int cpu)
 void kfree_call_rcu(struct rcu_head *head,
 		    void (*func)(struct rcu_head *rcu))
 {
-	__call_rcu(head, func, &rcu_sched_state, 1);
+	__call_rcu(head, func, &rcu_sched_state, -1, 1);
 }
 EXPORT_SYMBOL_GPL(kfree_call_rcu);
 
@@ -2104,3 +2125,373 @@  static void increment_cpu_stall_ticks(void)
 }
 
 #endif /* #else #ifdef CONFIG_RCU_CPU_STALL_INFO */
+
+#ifdef CONFIG_RCU_NOCB_CPU
+
+/*
+ * Offload callback processing from the boot-time-specified set of CPUs
+ * specified by rcu_nocb_mask.  For each CPU in the set, there is a
+ * kthread created that pulls the callbacks from the corresponding CPU,
+ * waits for a grace period to elapse, and invokes the callbacks.
+ * The no-CBs CPUs do a wake_up() on their kthread when they insert
+ * a callback into any empty list, unless the rcu_nocb_poll boot parameter
+ * has been specified, in which case each kthread actively polls its
+ * CPU.  (Which isn't so great for energy efficiency, but which does
+ * reduce RCU's overhead on that CPU.)
+ *
+ * This is intended to be used in conjunction with Frederic Weisbecker's
+ * adaptive-idle work, which would seriously reduce OS jitter on CPUs
+ * running CPU-bound user-mode computations.
+ *
+ * Offloading of callback processing could also in theory be used as
+ * an energy-efficiency measure because CPUs with no RCU callbacks
+ * queued are more aggressive about entering dyntick-idle mode.
+ */
+
+
+/* Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters. */
+static int __init rcu_nocb_setup(char *str)
+{
+	alloc_bootmem_cpumask_var(&rcu_nocb_mask);
+	have_rcu_nocb_mask = true;
+	cpulist_parse(str, rcu_nocb_mask);
+	return 1;
+}
+__setup("rcu_nocbs=", rcu_nocb_setup);
+
+/* Is the specified CPU a no-CPUs CPU? */
+static bool is_nocb_cpu(int cpu)
+{
+	if (have_rcu_nocb_mask)
+		return cpumask_test_cpu(cpu, rcu_nocb_mask);
+	return false;
+}
+
+/*
+ * Enqueue the specified string of rcu_head structures onto the specified
+ * CPU's no-CBs lists.  The CPU is specified by rdp, the head of the
+ * string by rhp, and the tail of the string by rhtp.  The non-lazy/lazy
+ * counts are supplied by rhcount and rhcount_lazy.
+ *
+ * If warranted, also wake up the kthread servicing this CPUs queues.
+ */
+static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
+				    struct rcu_head *rhp,
+				    struct rcu_head **rhtp,
+				    int rhcount, int rhcount_lazy)
+{
+	int len;
+	struct rcu_head **old_rhpp;
+	struct task_struct *t;
+
+	/* Enqueue the callback on the nocb list and update counts. */
+	old_rhpp = xchg(&rdp->nocb_tail, rhtp);
+	ACCESS_ONCE(*old_rhpp) = rhp;
+	atomic_long_add(rhcount, &rdp->nocb_q_count);
+	atomic_long_add(rhcount_lazy, &rdp->nocb_q_count_lazy);
+
+	/* If we are not being polled and there is a kthread, awaken it ... */
+	t = ACCESS_ONCE(rdp->nocb_kthread);
+	if (rcu_nocb_poll | !t)
+		return;
+	len = atomic_long_read(&rdp->nocb_q_count);
+	if (old_rhpp == &rdp->nocb_head) {
+		wake_up(&rdp->nocb_wq); /* ... only if queue was empty ... */
+		rdp->qlen_last_fqs_check = 0;
+	} else if (len > rdp->qlen_last_fqs_check + qhimark) {
+		wake_up_process(t); /* ... or if many callbacks queued. */
+		rdp->qlen_last_fqs_check = LONG_MAX / 2;
+	}
+	return;
+}
+
+/*
+ * This is a helper for __call_rcu(), which invokes this when the normal
+ * callback queue is inoperable.  If this is not a no-CBs CPU, this
+ * function returns failure back to __call_rcu(), which can complain
+ * appropriately.
+ *
+ * Otherwise, this function queues the callback where the corresponding
+ * "rcuo" kthread can find it.
+ */
+static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
+			    bool lazy)
+{
+
+	if (!is_nocb_cpu(rdp->cpu))
+		return 0;
+	__call_rcu_nocb_enqueue(rdp, rhp, &rhp->next, 1, lazy);
+	return 1;
+}
+
+/*
+ * Adopt orphaned callbacks on a no-CBs CPU, or return 0 if this is
+ * not a no-CBs CPU.
+ */
+static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
+						     struct rcu_data *rdp)
+{
+	long ql = rsp->qlen;
+	long qll = rsp->qlen_lazy;
+
+	/* If this is not a no-CBs CPU, tell the caller to do it the old way. */
+	if (!is_nocb_cpu(smp_processor_id()))
+		return 0;
+	rsp->qlen = 0;
+	rsp->qlen_lazy = 0;
+
+	/* First, enqueue the donelist, if any.  This preserves CB ordering. */
+	if (rsp->orphan_donelist != NULL) {
+		__call_rcu_nocb_enqueue(rdp, rsp->orphan_donelist,
+					rsp->orphan_donetail, ql, qll);
+		ql = qll = 0;
+		rsp->orphan_donelist = NULL;
+		rsp->orphan_donetail = &rsp->orphan_donelist;
+	}
+	if (rsp->orphan_nxtlist != NULL) {
+		__call_rcu_nocb_enqueue(rdp, rsp->orphan_nxtlist,
+					rsp->orphan_nxttail, ql, qll);
+		ql = qll = 0;
+		rsp->orphan_nxtlist = NULL;
+		rsp->orphan_nxttail = &rsp->orphan_nxtlist;
+	}
+	return 1;
+}
+
+/*
+ * There must be at least one non-no-CBs CPU in operation at any given
+ * time, because no-CBs CPUs are not capable of initiating grace periods
+ * independently.  This function therefore complains if the specified
+ * CPU is the last non-no-CBs CPU, allowing the CPU-hotplug system to
+ * avoid offlining the last such CPU.  (Recursion is a wonderful thing,
+ * but you have to have a base case!)
+ */
+static bool nocb_cpu_expendable(int cpu)
+{
+	cpumask_var_t non_nocb_cpus;
+	int ret;
+
+	/*
+	 * If there are no no-CB CPUs or if this CPU is not a no-CB CPU,
+	 * then offlining this CPU is harmless.  Let it happen.
+	 */
+	if (!have_rcu_nocb_mask || is_nocb_cpu(cpu))
+		return 1;
+
+	/* If no memory, play it safe and keep the CPU around. */
+	if (!alloc_cpumask_var(&non_nocb_cpus, GFP_NOIO))
+		return 0;
+	cpumask_andnot(non_nocb_cpus, cpu_online_mask, rcu_nocb_mask);
+	cpumask_clear_cpu(cpu, non_nocb_cpus);
+	ret = !cpumask_empty(non_nocb_cpus);
+	free_cpumask_var(non_nocb_cpus);
+	return ret;
+}
+
+/*
+ * Helper structure for remote registry of RCU callbacks.
+ * This is needed for when a no-CBs CPU needs to start a grace period.
+ * If it just invokes call_rcu(), the resulting callback will be queued,
+ * which can result in deadlock.
+ */
+struct rcu_head_remote {
+	struct rcu_head *rhp;
+	call_rcu_func_t *crf;
+	void (*func)(struct rcu_head *rhp);
+};
+
+/*
+ * Register a callback as specified by the rcu_head_remote struct.
+ * This function is intended to be invoked via smp_call_function_single().
+ */
+static void call_rcu_local(void *arg)
+{
+	struct rcu_head_remote *rhrp =
+		container_of(arg, struct rcu_head_remote, rhp);
+
+	rhrp->crf(rhrp->rhp, rhrp->func);
+}
+
+/*
+ * Set up an rcu_head_remote structure and the invoke call_rcu_local()
+ * on CPU 0 (which is guaranteed to be a non-no-CBs CPU) via
+ * smp_call_function_single().
+ */
+static void invoke_crf_remote(struct rcu_head *rhp,
+			      void (*func)(struct rcu_head *rhp),
+			      call_rcu_func_t crf)
+{
+	struct rcu_head_remote rhr;
+
+	rhr.rhp = rhp;
+	rhr.crf = crf;
+	rhr.func = func;
+	smp_call_function_single(0, call_rcu_local, &rhr, 1);
+}
+
+/*
+ * Helper functions to be passed to wait_rcu_gp(), each of which
+ * invokes invoke_crf_remote() to register a callback appropriately.
+ */
+static void __maybe_unused
+call_rcu_preempt_remote(struct rcu_head *rhp,
+			void (*func)(struct rcu_head *rhp))
+{
+	invoke_crf_remote(rhp, func, call_rcu);
+}
+static void call_rcu_bh_remote(struct rcu_head *rhp,
+			       void (*func)(struct rcu_head *rhp))
+{
+	invoke_crf_remote(rhp, func, call_rcu_bh);
+}
+static void call_rcu_sched_remote(struct rcu_head *rhp,
+				  void (*func)(struct rcu_head *rhp))
+{
+	invoke_crf_remote(rhp, func, call_rcu_sched);
+}
+
+/*
+ * Per-rcu_data kthread, but only for no-CBs CPUs.  Each kthread invokes
+ * callbacks queued by the corresponding no-CBs CPU.
+ */
+static int rcu_nocb_kthread(void *arg)
+{
+	int c, cl;
+	struct rcu_head *list;
+	struct rcu_head *next;
+	struct rcu_head **tail;
+	struct rcu_data *rdp = arg;
+
+	/* Each pass through this loop invokes one batch of callbacks */
+	for (;;) {
+		/* If not polling, wait for next batch of callbacks. */
+		if (!rcu_nocb_poll)
+			wait_event(rdp->nocb_wq, rdp->nocb_head);
+		list = ACCESS_ONCE(rdp->nocb_head);
+		if (!list) {
+			schedule_timeout_interruptible(1);
+			continue;
+		}
+
+		/*
+		 * Extract queued callbacks, update counts, and wait
+		 * for a grace period to elapse.
+		 */
+		ACCESS_ONCE(rdp->nocb_head) = NULL;
+		tail = xchg(&rdp->nocb_tail, &rdp->nocb_head);
+		c = atomic_long_xchg(&rdp->nocb_q_count, 0);
+		cl = atomic_long_xchg(&rdp->nocb_q_count_lazy, 0);
+		ACCESS_ONCE(rdp->nocb_p_count) += c;
+		ACCESS_ONCE(rdp->nocb_p_count_lazy) += cl;
+		wait_rcu_gp(rdp->rsp->call_remote);
+
+		/* Each pass through the following loop invokes a callback. */
+		trace_rcu_batch_start(rdp->rsp->name, cl, c, -1);
+		c = cl = 0;
+		while (list) {
+			next = list->next;
+			/* Wait for enqueuing to complete, if needed. */
+			while (next == NULL && &list->next != tail) {
+				schedule_timeout_interruptible(1);
+				next = list->next;
+			}
+			debug_rcu_head_unqueue(list);
+			local_bh_disable();
+			if (__rcu_reclaim(rdp->rsp->name, list))
+				cl++;
+			c++;
+			local_bh_enable();
+			list = next;
+		}
+		trace_rcu_batch_end(rdp->rsp->name, c, !!list, 0, 0, 1);
+		ACCESS_ONCE(rdp->nocb_p_count) -= c;
+		ACCESS_ONCE(rdp->nocb_p_count_lazy) -= cl;
+		rdp->n_cbs_invoked += c;
+	}
+	return 0;
+}
+
+/* Initialize per-rcu_data variables for no-CBs CPUs. */
+static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
+{
+	rdp->nocb_tail = &rdp->nocb_head;
+	init_waitqueue_head(&rdp->nocb_wq);
+}
+
+/* Create a kthread for each RCU flavor for each no-CBs CPU. */
+static void __init rcu_spawn_nocb_kthreads(struct rcu_state *rsp)
+{
+	int cpu;
+	struct rcu_data *rdp;
+	struct task_struct *t;
+
+	if (rcu_nocb_mask == NULL)
+		return;
+	for_each_cpu(cpu, rcu_nocb_mask) {
+		rdp = per_cpu_ptr(rsp->rda, cpu);
+		t = kthread_run(rcu_nocb_kthread, rdp, "rcuo%d", cpu);
+		BUG_ON(IS_ERR(t));
+		ACCESS_ONCE(rdp->nocb_kthread) = t;
+	}
+}
+
+/* Prevent __call_rcu() from enqueuing callbacks on no-CBs CPUs */
+static void init_nocb_callback_list(struct rcu_data *rdp)
+{
+	if (rcu_nocb_mask == NULL ||
+	    !cpumask_test_cpu(rdp->cpu, rcu_nocb_mask))
+		return;
+	rdp->nxttail[RCU_NEXT_TAIL] = NULL;
+}
+
+/* Initialize the ->call_remote fields in the rcu_state structures. */
+static void __init rcu_init_nocb(void)
+{
+#ifdef CONFIG_PREEMPT_RCU
+	rcu_preempt_state.call_remote = call_rcu_preempt_remote;
+#endif /* #ifdef CONFIG_PREEMPT_RCU */
+	rcu_bh_state.call_remote = call_rcu_bh_remote;
+	rcu_sched_state.call_remote = call_rcu_sched_remote;
+}
+
+#else /* #ifdef CONFIG_RCU_NOCB_CPU */
+
+static bool is_nocb_cpu(int cpu)
+{
+	return false;
+}
+
+static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
+			    bool lazy)
+{
+	return 0;
+}
+
+static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
+						     struct rcu_data *rdp)
+{
+	return 0;
+}
+
+static bool nocb_cpu_expendable(int cpu)
+{
+	return 1;
+}
+
+static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
+{
+}
+
+static void __init rcu_spawn_nocb_kthreads(struct rcu_state *rsp)
+{
+}
+
+static void init_nocb_callback_list(struct rcu_data *rdp)
+{
+}
+
+static void __init rcu_init_nocb(void)
+{
+}
+
+#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
index e84679c..5e9ca3e 100644
--- a/kernel/rcutree_trace.c
+++ b/kernel/rcutree_trace.c
@@ -111,6 +111,8 @@  static char convert_kthread_status(unsigned int kthread_status)
 
 static void print_one_rcu_data(struct seq_file *m, struct rcu_data *rdp)
 {
+	long ql, qll;
+
 	if (!rdp->beenonline)
 		return;
 	seq_printf(m, "%3d%cc=%lu g=%lu pq=%d qp=%d",
@@ -124,8 +126,11 @@  static void print_one_rcu_data(struct seq_file *m, struct rcu_data *rdp)
 		   rdp->dynticks->dynticks_nmi_nesting,
 		   rdp->dynticks_fqs);
 	seq_printf(m, " of=%lu", rdp->offline_fqs);
+	rcu_nocb_q_lengths(rdp, &ql, &qll);
+	qll += rdp->qlen_lazy;
+	ql += rdp->qlen;
 	seq_printf(m, " ql=%ld/%ld qs=%c%c%c%c",
-		   rdp->qlen_lazy, rdp->qlen,
+		   qll, ql,
 		   ".N"[rdp->nxttail[RCU_NEXT_READY_TAIL] !=
 			rdp->nxttail[RCU_NEXT_TAIL]],
 		   ".R"[rdp->nxttail[RCU_WAIT_TAIL] !=