From patchwork Wed Sep 5 21:39:45 2012 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Paul E. McKenney" X-Patchwork-Id: 11200 Return-Path: X-Original-To: patchwork@peony.canonical.com Delivered-To: patchwork@peony.canonical.com Received: from fiordland.canonical.com (fiordland.canonical.com [91.189.94.145]) by peony.canonical.com (Postfix) with ESMTP id 9BE5E23F26 for ; Wed, 5 Sep 2012 21:39:59 +0000 (UTC) Received: from mail-iy0-f180.google.com (mail-iy0-f180.google.com [209.85.210.180]) by fiordland.canonical.com (Postfix) with ESMTP id B27A4A1867B for ; Wed, 5 Sep 2012 21:39:04 +0000 (UTC) Received: by iafj25 with SMTP id j25so1188893iaf.11 for ; Wed, 05 Sep 2012 14:39:58 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=20120113; h=x-forwarded-to:x-forwarded-for:delivered-to:received-spf:date:from :to:cc:subject:message-id:reply-to:mime-version:content-type :content-disposition:user-agent:x-content-scanned:x-cbid :x-gm-message-state; bh=p3WlecyVZ0iJdyyNuR5HIZwQnaSSRuDiynAD67xwePg=; b=BfK1bYni5A0V+sXkE71guQCd7drLOSw5YDD4ZS1ZjjYdqqMVYQPNdwam6m6A76jGrt fyJqeNsOP78bf8aKc47Oen6l0J3naeTxfB2wOXlLqv9YGv556Cy73rdO+I4LoIbznsIR Qt/ZSSO+YP4P6zQcTlGml+T5YL0LZmfrcr+Nyg7s32cIsz7ysclrLSm8yMJM3HUSS8Ua H/kNfZXN6sGZS9WOV7vD9KIaus9BlRPyQOJ3FTPnFixUzw9TYriDl5RMZZY4R0cAYtT8 cHm++N7YcFIqKdArUq6qB8poImU21qSOt3reSFqMr5aICxKM/8Set40NeUT9AWGuU0W2 hFPw== Received: by 10.50.10.201 with SMTP id k9mr19715615igb.28.1346881198296; Wed, 05 Sep 2012 14:39:58 -0700 (PDT) X-Forwarded-To: linaro-patchwork@canonical.com X-Forwarded-For: patch@linaro.org linaro-patchwork@canonical.com Delivered-To: patches@linaro.org Received: by 10.50.184.232 with SMTP id ex8csp281033igc; Wed, 5 Sep 2012 14:39:57 -0700 (PDT) Received: by 10.182.216.99 with SMTP id op3mr18085578obc.85.1346881197006; Wed, 05 Sep 2012 14:39:57 -0700 (PDT) Received: from e36.co.us.ibm.com (e36.co.us.ibm.com. [32.97.110.154]) by mx.google.com with ESMTPS id k6si94707obd.211.2012.09.05.14.39.56 (version=TLSv1/SSLv3 cipher=OTHER); Wed, 05 Sep 2012 14:39:56 -0700 (PDT) Received-SPF: pass (google.com: domain of paulmck@linux.vnet.ibm.com designates 32.97.110.154 as permitted sender) client-ip=32.97.110.154; Authentication-Results: mx.google.com; spf=pass (google.com: domain of paulmck@linux.vnet.ibm.com designates 32.97.110.154 as permitted sender) smtp.mail=paulmck@linux.vnet.ibm.com Received: from /spool/local by e36.co.us.ibm.com with IBM ESMTP SMTP Gateway: Authorized Use Only! Violators will be prosecuted for from ; Wed, 5 Sep 2012 15:39:55 -0600 Received: from d03dlp01.boulder.ibm.com (9.17.202.177) by e36.co.us.ibm.com (192.168.1.136) with IBM ESMTP SMTP Gateway: Authorized Use Only! Violators will be prosecuted; Wed, 5 Sep 2012 15:39:53 -0600 Received: from d03relay01.boulder.ibm.com (d03relay01.boulder.ibm.com [9.17.195.226]) by d03dlp01.boulder.ibm.com (Postfix) with ESMTP id C1C12C40002 for ; Wed, 5 Sep 2012 15:39:50 -0600 (MDT) Received: from d03av01.boulder.ibm.com (d03av01.boulder.ibm.com [9.17.195.167]) by d03relay01.boulder.ibm.com (8.13.8/8.13.8/NCO v10.0) with ESMTP id q85Ldqfk227454 for ; Wed, 5 Sep 2012 15:39:52 -0600 Received: from d03av01.boulder.ibm.com (loopback [127.0.0.1]) by d03av01.boulder.ibm.com (8.14.4/8.13.1/NCO v10.0 AVout) with ESMTP id q85Ldk6w008121 for ; Wed, 5 Sep 2012 15:39:51 -0600 Received: from paulmck-ThinkPad-W500 ([9.47.24.133]) by d03av01.boulder.ibm.com (8.14.4/8.13.1/NCO v10.0 AVin) with ESMTP id q85LdjUE008099; Wed, 5 Sep 2012 15:39:45 -0600 Received: by paulmck-ThinkPad-W500 (Postfix, from userid 1000) id 687A7E4D79; Wed, 5 Sep 2012 14:39:45 -0700 (PDT) Date: Wed, 5 Sep 2012 14:39:45 -0700 From: "Paul E. McKenney" To: linux-kernel@vger.kernel.org Cc: mingo@elte.hu, laijs@cn.fujitsu.com, dipankar@in.ibm.com, akpm@linux-foundation.org, mathieu.desnoyers@efficios.com, josh@joshtriplett.org, niv@us.ibm.com, tglx@linutronix.de, peterz@infradead.org, rostedt@goodmis.org, Valdis.Kletnieks@vt.edu, dhowells@redhat.com, eric.dumazet@gmail.com, darren@dvhart.com, fweisbec@gmail.com, sbw@mit.edu, patches@linaro.org Subject: [PATCH RFC tip/core/rcu] Add callback-free CPUs Message-ID: <20120905213945.GA15216@linux.vnet.ibm.com> Reply-To: paulmck@linux.vnet.ibm.com MIME-Version: 1.0 Content-Disposition: inline User-Agent: Mutt/1.5.21 (2010-09-15) X-Content-Scanned: Fidelis XPS MAILER x-cbid: 12090521-7606-0000-0000-00000362A1BD X-Gm-Message-State: ALoCoQn+7OaSzHOg1NL4YSVMw4F6GNXwjBoUiUGrOY2DqRTzP9+sONBGiwsPINc4I6uCuXPf3Phz RCU callback execution can add significant OS jitter and also can degrade scheduling latency. This commit therefore adds the ability for selected CPUs ("rcu_nocbs=" boot parameter) to have their callbacks offloaded to kthreads. If the "rcu_nocb_poll" boot parameter is also specified, these kthreads will do polling, removing the need for the offloaded CPUs to do wakeups. At least one CPU must be doing normal callback processing: currently CPU 0 cannot be selected as a no-CBs CPU. In addition, attempts to offline the last normal-CBs CPU will fail. This is an experimental patch, so just FYI for the moment. Known shortcomings include: o The counters should be atomic_long_t rather than atomic_t. o No-CBs CPUs can be configured only at boot time. o Only a modest number of CPUs can be configured as no-CBs CPUs. Definitely a few tens, perhaps a few hundred, but no way thousands. o At least one CPU must remain a normal-CBs CPU. o Not much in the way of energy-efficiency features, though there are some natural energy savings inherent in the implementation o The per-no-CBs-CPU kthreads are not subject to RCU priority boosting. o Care is required when setting the kthreads to RT priority. Later versions will address some of them, but others are likely to remain. Signed-off-by: Paul E. McKenney diff --git a/include/trace/events/rcu.h b/include/trace/events/rcu.h index 5bde94d..d4f559b 100644 --- a/include/trace/events/rcu.h +++ b/include/trace/events/rcu.h @@ -549,6 +549,7 @@ TRACE_EVENT(rcu_torture_read, * "EarlyExit": rcu_barrier_callback() piggybacked, thus early exit. * "Inc1": rcu_barrier_callback() piggyback check counter incremented. * "Offline": rcu_barrier_callback() found offline CPU + * "OnlineNoCB": rcu_barrier_callback() found online no-CBs CPU. * "OnlineQ": rcu_barrier_callback() found online CPU with callbacks. * "OnlineNQ": rcu_barrier_callback() found online CPU, no callbacks. * "IRQ": An rcu_barrier_callback() callback posted on remote CPU. diff --git a/init/Kconfig b/init/Kconfig index c26b8a1..ccda03a 100644 --- a/init/Kconfig +++ b/init/Kconfig @@ -598,6 +598,25 @@ config RCU_BOOST_DELAY Accept the default if unsure. +config RCU_NOCB_CPU + bool "Offload RCU callback processing from boot-selected CPUs" + depends on TREE_RCU || TREE_PREEMPT_RCU + default n + help + Use this option to reduce OS jitter for aggressive HPC or + real-time workloads. + + This option offloads callback invocation from the set of CPUs + specified at boot time by the rcu_nocbs parameter. For each + such CPU, a kthread ("rcuoN") will be created to invoke callbacks. + Nothing prevents this kthread from running on of of the specified + CPUs, but (1) the kthreads may be preempted between each callback + and (2) affinity or cgroups can be used to force the kthreads off + of those CPUs if desired. + + Say Y here if you want reduced OS jitter on selected CPUs. + Say N here if you are unsure. + endmenu # "RCU Subsystem" config IKCONFIG diff --git a/kernel/rcutree.c b/kernel/rcutree.c index 791aea0..cfc5a91 100644 --- a/kernel/rcutree.c +++ b/kernel/rcutree.c @@ -303,7 +303,8 @@ EXPORT_SYMBOL_GPL(rcu_sched_force_quiescent_state); static int cpu_has_callbacks_ready_to_invoke(struct rcu_data *rdp) { - return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL]; + return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL] && + rdp->nxttail[RCU_DONE_TAIL] != NULL; } /* @@ -312,7 +313,9 @@ cpu_has_callbacks_ready_to_invoke(struct rcu_data *rdp) static int cpu_needs_another_gp(struct rcu_state *rsp, struct rcu_data *rdp) { - return *rdp->nxttail[RCU_DONE_TAIL] && !rcu_gp_in_progress(rsp); + return rdp->nxttail[RCU_DONE_TAIL] && + *rdp->nxttail[RCU_DONE_TAIL] && + !rcu_gp_in_progress(rsp); } /* @@ -1071,6 +1074,7 @@ static void init_callback_list(struct rcu_data *rdp) rdp->nxtlist = NULL; for (i = 0; i < RCU_NEXT_SIZE; i++) rdp->nxttail[i] = &rdp->nxtlist; + init_nocb_callback_list(rdp); } /* @@ -1560,6 +1564,10 @@ static void rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp, struct rcu_node *rnp, struct rcu_data *rdp) { + /* No-CBs CPUs do not have orphanable callbacks. */ + if (is_nocb_cpu(rdp->cpu)) + return; + /* * Orphan the callbacks. First adjust the counts. This is safe * because ->onofflock excludes _rcu_barrier()'s adoption of @@ -1611,6 +1619,10 @@ static void rcu_adopt_orphan_cbs(struct rcu_state *rsp) int i; struct rcu_data *rdp = __this_cpu_ptr(rsp->rda); + /* No-CBs CPUs are handled specially. */ + if (rcu_nocb_adopt_orphan_cbs(rsp, rdp)) + return; + /* Do the accounting first. */ rdp->qlen_lazy += rsp->qlen_lazy; rdp->qlen += rsp->qlen; @@ -2087,9 +2099,15 @@ static void __call_rcu_core(struct rcu_state *rsp, struct rcu_data *rdp, } } +/* + * Helper function for call_rcu() and friends. The cpu argument will + * normally be -1, indicating "currently running CPU". It may specify + * a CPU only if that CPU is a no-CBs CPU. Currently, only _rcu_barrier() + * is expected to specify a CPU. + */ static void __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu), - struct rcu_state *rsp, bool lazy) + struct rcu_state *rsp, int cpu, bool lazy) { unsigned long flags; struct rcu_data *rdp; @@ -2109,9 +2127,14 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu), rdp = this_cpu_ptr(rsp->rda); /* Add the callback to our list. */ - if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL)) { + if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL) || cpu != -1) { + int offline; + + if (cpu != -1) + rdp = per_cpu_ptr(rsp->rda, cpu); + offline = !__call_rcu_nocb(rdp, head, lazy); + WARN_ON_ONCE(offline); /* _call_rcu() is illegal on offline CPU; leak the callback. */ - WARN_ON_ONCE(1); local_irq_restore(flags); return; } @@ -2140,7 +2163,7 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu), */ void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *rcu)) { - __call_rcu(head, func, &rcu_sched_state, 0); + __call_rcu(head, func, &rcu_sched_state, -1, 0); } EXPORT_SYMBOL_GPL(call_rcu_sched); @@ -2149,7 +2172,7 @@ EXPORT_SYMBOL_GPL(call_rcu_sched); */ void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *rcu)) { - __call_rcu(head, func, &rcu_bh_state, 0); + __call_rcu(head, func, &rcu_bh_state, -1, 0); } EXPORT_SYMBOL_GPL(call_rcu_bh); @@ -2538,9 +2561,17 @@ static void _rcu_barrier(struct rcu_state *rsp) * When that callback is invoked, we will know that all of the * corresponding CPU's preceding callbacks have been invoked. */ - for_each_online_cpu(cpu) { + for_each_possible_cpu(cpu) { + if (!cpu_online(cpu) && !is_nocb_cpu(cpu)) + continue; rdp = per_cpu_ptr(rsp->rda, cpu); - if (ACCESS_ONCE(rdp->qlen)) { + if (is_nocb_cpu(cpu)) { + _rcu_barrier_trace(rsp, "OnlineNoCB", cpu, + rsp->n_barrier_done); + atomic_inc(&rsp->barrier_cpu_count); + __call_rcu(&rdp->barrier_head, rcu_barrier_callback, + rsp, cpu, 0); + } else if (ACCESS_ONCE(rdp->qlen)) { _rcu_barrier_trace(rsp, "OnlineQ", cpu, rsp->n_barrier_done); smp_call_function_single(cpu, rcu_barrier_func, rsp, 1); @@ -2614,6 +2645,7 @@ rcu_boot_init_percpu_data(int cpu, struct rcu_state *rsp) #endif rdp->cpu = cpu; rdp->rsp = rsp; + rcu_boot_init_nocb_percpu_data(rdp); raw_spin_unlock_irqrestore(&rnp->lock, flags); } @@ -2699,6 +2731,7 @@ static int __cpuinit rcu_cpu_notify(struct notifier_block *self, struct rcu_data *rdp = per_cpu_ptr(rcu_state->rda, cpu); struct rcu_node *rnp = rdp->mynode; struct rcu_state *rsp; + int ret = NOTIFY_OK; trace_rcu_utilization("Start CPU hotplug"); switch (action) { @@ -2713,8 +2746,12 @@ static int __cpuinit rcu_cpu_notify(struct notifier_block *self, rcu_cpu_kthread_setrt(cpu, 1); break; case CPU_DOWN_PREPARE: - rcu_node_kthread_setaffinity(rnp, cpu); - rcu_cpu_kthread_setrt(cpu, 0); + if (nocb_cpu_expendable(cpu)) { + rcu_node_kthread_setaffinity(rnp, cpu); + rcu_cpu_kthread_setrt(cpu, 0); + } else { + ret = NOTIFY_BAD; + } break; case CPU_DYING: case CPU_DYING_FROZEN: @@ -2738,7 +2775,7 @@ static int __cpuinit rcu_cpu_notify(struct notifier_block *self, break; } trace_rcu_utilization("End CPU hotplug"); - return NOTIFY_OK; + return ret; } /* @@ -2758,6 +2795,7 @@ static int __init rcu_spawn_gp_kthread(void) raw_spin_lock_irqsave(&rnp->lock, flags); rsp->gp_kthread = t; raw_spin_unlock_irqrestore(&rnp->lock, flags); + rcu_spawn_nocb_kthreads(rsp); } return 0; } @@ -2952,6 +2990,7 @@ void __init rcu_init(void) rcu_init_one(&rcu_sched_state, &rcu_sched_data); rcu_init_one(&rcu_bh_state, &rcu_bh_data); __rcu_init_preempt(); + rcu_init_nocb(); open_softirq(RCU_SOFTIRQ, rcu_process_callbacks); /* diff --git a/kernel/rcutree.h b/kernel/rcutree.h index 499d661..5e5568d 100644 --- a/kernel/rcutree.h +++ b/kernel/rcutree.h @@ -323,6 +323,18 @@ struct rcu_data { struct rcu_head oom_head; #endif /* #ifdef CONFIG_RCU_FAST_NO_HZ */ + /* 7) Callback offloading. */ +#ifdef CONFIG_RCU_NOCB_CPU + struct rcu_head *nocb_head; /* CBs waiting for kthread. */ + struct rcu_head **nocb_tail; + atomic_t nocb_q_count; /* # CBs waiting for kthread */ + atomic_t nocb_q_count_lazy; /* (approximate). */ + int nocb_p_count; /* # CBs being invoked by kthread */ + int nocb_p_count_lazy; /* (approximate). */ + wait_queue_head_t nocb_wq; /* For nocb kthreads to sleep on. */ + struct task_struct *nocb_kthread; +#endif /* #ifdef CONFIG_RCU_NOCB_CPU */ + int cpu; struct rcu_state *rsp; }; @@ -375,6 +387,12 @@ struct rcu_state { struct rcu_data __percpu *rda; /* pointer of percu rcu_data. */ void (*call)(struct rcu_head *head, /* call_rcu() flavor. */ void (*func)(struct rcu_head *head)); +#ifdef CONFIG_RCU_NOCB_CPU + void (*call_remote)(struct rcu_head *head, + void (*func)(struct rcu_head *head)); + /* call_rcu() flavor, but for */ + /* placing on remote CPU. */ +#endif /* #ifdef CONFIG_RCU_NOCB_CPU */ /* The following fields are guarded by the root rcu_node's lock. */ @@ -428,6 +446,8 @@ struct rcu_state { #define RCU_GP_FLAG_FQS 0x2 /* Need grace-period quiescent-state forcing. */ extern struct list_head rcu_struct_flavors; + +/* Sequence through rcu_state structures for each RCU flavor. */ #define for_each_rcu_flavor(rsp) \ list_for_each_entry((rsp), &rcu_struct_flavors, flavors) @@ -511,5 +531,32 @@ static void print_cpu_stall_info(struct rcu_state *rsp, int cpu); static void print_cpu_stall_info_end(void); static void zero_cpu_stall_ticks(struct rcu_data *rdp); static void increment_cpu_stall_ticks(void); +static bool is_nocb_cpu(int cpu); +static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp, + bool lazy); +static bool rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp, + struct rcu_data *rdp); +static bool nocb_cpu_expendable(int cpu); +static void rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp); +static void rcu_spawn_nocb_kthreads(struct rcu_state *rsp); +static void init_nocb_callback_list(struct rcu_data *rdp); +static void __init rcu_init_nocb(void); #endif /* #ifndef RCU_TREE_NONCORE */ + +#ifdef CONFIG_RCU_TRACE +#ifdef CONFIG_RCU_NOCB_CPU +/* Sum up queue lengths for tracing. */ +static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll) +{ + *ql = atomic_read(&rdp->nocb_q_count) + rdp->nocb_p_count; + *qll = atomic_read(&rdp->nocb_q_count_lazy) + rdp->nocb_p_count_lazy; +} +#else /* #ifdef CONFIG_RCU_NOCB_CPU */ +static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll) +{ + *ql = 0; + *qll = 0; +} +#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */ +#endif /* #ifdef CONFIG_RCU_TRACE */ diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h index 5384eda..2341ddd 100644 --- a/kernel/rcutree_plugin.h +++ b/kernel/rcutree_plugin.h @@ -35,6 +35,13 @@ #define RCU_BOOST_PRIO RCU_KTHREAD_PRIO #endif +#ifdef CONFIG_RCU_NOCB_CPU +static cpumask_var_t rcu_nocb_mask; /* CPUs to have callbacks offloaded. */ +static bool have_rcu_nocb_mask; /* Was rcu_nocb_mask allocated? */ +static bool rcu_nocb_poll; /* Offload kthread are to poll. */ +module_param(rcu_nocb_poll, bool, 0444); +#endif /* #ifdef CONFIG_RCU_NOCB_CPU */ + /* * Check the RCU kernel configuration parameters and print informative * messages about anything out of the ordinary. If you like #ifdef, you @@ -75,6 +82,20 @@ static void __init rcu_bootup_announce_oddness(void) printk(KERN_INFO "\tExperimental boot-time adjustment of leaf fanout to %d.\n", rcu_fanout_leaf); if (nr_cpu_ids != NR_CPUS) printk(KERN_INFO "\tRCU restricting CPUs from NR_CPUS=%d to nr_cpu_ids=%d.\n", NR_CPUS, nr_cpu_ids); +#ifdef CONFIG_RCU_NOCB_CPU + if (have_rcu_nocb_mask) { + char buf[NR_CPUS * 5]; + + if (cpumask_test_cpu(0, rcu_nocb_mask)) { + cpumask_clear_cpu(0, rcu_nocb_mask); + pr_info("\tCPU 0: illegal no-CBs CPU (cleared).\n"); + } + cpulist_scnprintf(buf, sizeof(buf), rcu_nocb_mask); + pr_info("\tExperimental no-CBs CPUs: %s.\n", buf); + if (rcu_nocb_poll) + pr_info("\tExperimental polled no-CBs CPUs.\n"); + } +#endif /* #ifdef CONFIG_RCU_NOCB_CPU */ } #ifdef CONFIG_TREE_PREEMPT_RCU @@ -641,7 +662,7 @@ static void rcu_preempt_do_callbacks(void) */ void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu)) { - __call_rcu(head, func, &rcu_preempt_state, 0); + __call_rcu(head, func, &rcu_preempt_state, -1, 0); } EXPORT_SYMBOL_GPL(call_rcu); @@ -655,7 +676,7 @@ EXPORT_SYMBOL_GPL(call_rcu); void kfree_call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu)) { - __call_rcu(head, func, &rcu_preempt_state, 1); + __call_rcu(head, func, &rcu_preempt_state, -1, 1); } EXPORT_SYMBOL_GPL(kfree_call_rcu); @@ -1012,7 +1033,7 @@ static void rcu_preempt_check_callbacks(int cpu) void kfree_call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu)) { - __call_rcu(head, func, &rcu_sched_state, 1); + __call_rcu(head, func, &rcu_sched_state, -1, 1); } EXPORT_SYMBOL_GPL(kfree_call_rcu); @@ -2347,3 +2368,373 @@ static void increment_cpu_stall_ticks(void) } #endif /* #else #ifdef CONFIG_RCU_CPU_STALL_INFO */ + +#ifdef CONFIG_RCU_NOCB_CPU + +/* + * Offload callback processing from the boot-time-specified set of CPUs + * specified by rcu_nocb_mask. For each CPU in the set, there is a + * kthread created that pulls the callbacks from the corresponding CPU, + * waits for a grace period to elapse, and invokes the callbacks. + * The no-CBs CPUs do a wake_up() on their kthread when they insert + * a callback into any empty list, unless the rcu_nocb_poll boot parameter + * has been specified, in which case each kthread actively polls its + * CPU. (Which isn't so great for energy efficiency, but which does + * reduce RCU's overhead on that CPU.) + * + * This is intended to be used in conjunction with Frederic Weisbecker's + * adaptive-idle work, which would seriously reduce OS jitter on CPUs + * running CPU-bound user-mode computations. + * + * Offloading of callback processing could also in theory be used as + * an energy-efficiency measure because CPUs with no RCU callbacks + * queued are more aggressive about entering dyntick-idle mode. + */ + + +/* Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters. */ +static int __init rcu_nocb_setup(char *str) +{ + alloc_bootmem_cpumask_var(&rcu_nocb_mask); + have_rcu_nocb_mask = true; + cpulist_parse(str, rcu_nocb_mask); + return 1; +} +__setup("rcu_nocbs=", rcu_nocb_setup); + +/* Is the specified CPU a no-CPUs CPU? */ +static bool is_nocb_cpu(int cpu) +{ + if (have_rcu_nocb_mask) + return cpumask_test_cpu(cpu, rcu_nocb_mask); + return false; +} + +/* + * Enqueue the specified string of rcu_head structures onto the specified + * CPU's no-CBs lists. The CPU is specified by rdp, the head of the + * string by rhp, and the tail of the string by rhtp. The non-lazy/lazy + * counts are supplied by rhcount and rhcount_lazy. + * + * If warranted, also wake up the kthread servicing this CPUs queues. + */ +static void __call_rcu_nocb_enqueue(struct rcu_data *rdp, + struct rcu_head *rhp, + struct rcu_head **rhtp, + int rhcount, int rhcount_lazy) +{ + int len; + struct rcu_head **old_rhpp; + struct task_struct *t; + + /* Enqueue the callback on the nocb list and update counts. */ + old_rhpp = xchg(&rdp->nocb_tail, rhtp); + ACCESS_ONCE(*old_rhpp) = rhp; + atomic_add(rhcount, &rdp->nocb_q_count); + atomic_add(rhcount_lazy, &rdp->nocb_q_count_lazy); + + /* If we are not being polled and there is a kthread, awaken it ... */ + t = ACCESS_ONCE(rdp->nocb_kthread); + if (rcu_nocb_poll | !t) + return; + len = atomic_read(&rdp->nocb_q_count); + if (old_rhpp == &rdp->nocb_head) { + wake_up(&rdp->nocb_wq); /* ... only if queue was empty ... */ + rdp->qlen_last_fqs_check = 0; + } else if (len > rdp->qlen_last_fqs_check + qhimark) { + wake_up_process(t); /* ... or if many callbacks queued. */ + rdp->qlen_last_fqs_check = LONG_MAX / 2; + } + return; +} + +/* + * This is a helper for __call_rcu(), which invokes this when the normal + * callback queue is inoperable. If this is not a no-CBs CPU, this + * function returns failure back to __call_rcu(), which can complain + * appropriately. + * + * Otherwise, this function queues the callback where the corresponding + * "rcuo" kthread can find it. + */ +static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp, + bool lazy) +{ + + if (!is_nocb_cpu(rdp->cpu)) + return 0; + __call_rcu_nocb_enqueue(rdp, rhp, &rhp->next, 1, lazy); + return 1; +} + +/* + * Adopt orphaned callbacks on a no-CBs CPU, or return 0 if this is + * not a no-CBs CPU. + */ +static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp, + struct rcu_data *rdp) +{ + long ql = rsp->qlen; + long qll = rsp->qlen_lazy; + + /* If this is not a no-CBs CPU, tell the caller to do it the old way. */ + if (!is_nocb_cpu(smp_processor_id())) + return 0; + rsp->qlen = 0; + rsp->qlen_lazy = 0; + + /* First, enqueue the donelist, if any. This preserves CB ordering. */ + if (rsp->orphan_donelist != NULL) { + __call_rcu_nocb_enqueue(rdp, rsp->orphan_donelist, + rsp->orphan_donetail, ql, qll); + ql = qll = 0; + rsp->orphan_donelist = NULL; + rsp->orphan_donetail = &rsp->orphan_donelist; + } + if (rsp->orphan_nxtlist != NULL) { + __call_rcu_nocb_enqueue(rdp, rsp->orphan_nxtlist, + rsp->orphan_nxttail, ql, qll); + ql = qll = 0; + rsp->orphan_nxtlist = NULL; + rsp->orphan_nxttail = &rsp->orphan_nxtlist; + } + return 1; +} + +/* + * There must be at least one non-no-CBs CPU in operation at any given + * time, because no-CBs CPUs are not capable of initiating grace periods + * independently. This function therefore complains if the specified + * CPU is the last non-no-CBs CPU, allowing the CPU-hotplug system to + * avoid offlining the last such CPU. (Recursion is a wonderful thing, + * but you have to have a base case!) + */ +static bool nocb_cpu_expendable(int cpu) +{ + cpumask_var_t non_nocb_cpus; + int ret; + + /* + * If there are no no-CB CPUs or if this CPU is not a no-CB CPU, + * then offlining this CPU is harmless. Let it happen. + */ + if (!have_rcu_nocb_mask || is_nocb_cpu(cpu)) + return 1; + + /* If no memory, play it safe and keep the CPU around. */ + if (!alloc_cpumask_var(&non_nocb_cpus, GFP_NOIO)) + return 0; + cpumask_andnot(non_nocb_cpus, cpu_online_mask, rcu_nocb_mask); + cpumask_clear_cpu(cpu, non_nocb_cpus); + ret = !cpumask_empty(non_nocb_cpus); + free_cpumask_var(non_nocb_cpus); + return ret; +} + +/* + * Helper structure for remote registry of RCU callbacks. + * This is needed for when a no-CBs CPU needs to start a grace period. + * If it just invokes call_rcu(), the resulting callback will be queued, + * which can result in deadlock. + */ +struct rcu_head_remote { + struct rcu_head *rhp; + call_rcu_func_t *crf; + void (*func)(struct rcu_head *rhp); +}; + +/* + * Register a callback as specified by the rcu_head_remote struct. + * This function is intended to be invoked via smp_call_function_single(). + */ +static void call_rcu_local(void *arg) +{ + struct rcu_head_remote *rhrp = + container_of(arg, struct rcu_head_remote, rhp); + + rhrp->crf(rhrp->rhp, rhrp->func); +} + +/* + * Set up an rcu_head_remote structure and the invoke call_rcu_local() + * on CPU 0 (which is guaranteed to be a non-no-CBs CPU) via + * smp_call_function_single(). + */ +static void invoke_crf_remote(struct rcu_head *rhp, + void (*func)(struct rcu_head *rhp), + call_rcu_func_t crf) +{ + struct rcu_head_remote rhr; + + rhr.rhp = rhp; + rhr.crf = crf; + rhr.func = func; + smp_call_function_single(0, call_rcu_local, &rhr, 1); +} + +/* + * Helper functions to be passed to wait_rcu_gp(), each of which + * invokes invoke_crf_remote() to register a callback appropriately. + */ +static void __maybe_unused +call_rcu_preempt_remote(struct rcu_head *rhp, + void (*func)(struct rcu_head *rhp)) +{ + invoke_crf_remote(rhp, func, call_rcu); +} +static void call_rcu_bh_remote(struct rcu_head *rhp, + void (*func)(struct rcu_head *rhp)) +{ + invoke_crf_remote(rhp, func, call_rcu_bh); +} +static void call_rcu_sched_remote(struct rcu_head *rhp, + void (*func)(struct rcu_head *rhp)) +{ + invoke_crf_remote(rhp, func, call_rcu_sched); +} + +/* + * Per-rcu_data kthread, but only for no-CBs CPUs. Each kthread invokes + * callbacks queued by the corresponding no-CBs CPU. + */ +static int rcu_nocb_kthread(void *arg) +{ + int c, cl; + struct rcu_head *list; + struct rcu_head *next; + struct rcu_head **tail; + struct rcu_data *rdp = arg; + + /* Each pass through this loop invokes one batch of callbacks */ + for (;;) { + /* If not polling, wait for next batch of callbacks. */ + if (!rcu_nocb_poll) + wait_event(rdp->nocb_wq, rdp->nocb_head); + list = ACCESS_ONCE(rdp->nocb_head); + if (!list) { + schedule_timeout_interruptible(1); + continue; + } + + /* + * Extract queued callbacks, update counts, and wait + * for a grace period to elapse. + */ + ACCESS_ONCE(rdp->nocb_head) = NULL; + tail = xchg(&rdp->nocb_tail, &rdp->nocb_head); + c = atomic_xchg(&rdp->nocb_q_count, 0); + cl = atomic_xchg(&rdp->nocb_q_count_lazy, 0); + ACCESS_ONCE(rdp->nocb_p_count) += c; + ACCESS_ONCE(rdp->nocb_p_count_lazy) += cl; + wait_rcu_gp(rdp->rsp->call_remote); + + /* Each pass through the following loop invokes a callback. */ + trace_rcu_batch_start(rdp->rsp->name, cl, c, -1); + c = cl = 0; + while (list) { + next = list->next; + /* Wait for enqueuing to complete, if needed. */ + while (next == NULL && &list->next != tail) { + schedule_timeout_interruptible(1); + next = list->next; + } + debug_rcu_head_unqueue(list); + local_bh_disable(); + if (__rcu_reclaim(rdp->rsp->name, list)) + cl++; + c++; + local_bh_enable(); + list = next; + } + trace_rcu_batch_end(rdp->rsp->name, c, !!list, 0, 0, 1); + ACCESS_ONCE(rdp->nocb_p_count) -= c; + ACCESS_ONCE(rdp->nocb_p_count_lazy) -= cl; + rdp->n_cbs_invoked += c; + } + return 0; +} + +/* Initialize per-rcu_data variables for no-CBs CPUs. */ +static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp) +{ + rdp->nocb_tail = &rdp->nocb_head; + init_waitqueue_head(&rdp->nocb_wq); +} + +/* Create a kthread for each RCU flavor for each no-CBs CPU. */ +static void __init rcu_spawn_nocb_kthreads(struct rcu_state *rsp) +{ + int cpu; + struct rcu_data *rdp; + struct task_struct *t; + + if (rcu_nocb_mask == NULL) + return; + for_each_cpu(cpu, rcu_nocb_mask) { + rdp = per_cpu_ptr(rsp->rda, cpu); + t = kthread_run(rcu_nocb_kthread, rdp, "rcuo%d", cpu); + BUG_ON(IS_ERR(t)); + ACCESS_ONCE(rdp->nocb_kthread) = t; + } +} + +/* Prevent __call_rcu() from enqueuing callbacks on no-CBs CPUs */ +static void init_nocb_callback_list(struct rcu_data *rdp) +{ + if (rcu_nocb_mask == NULL || + !cpumask_test_cpu(rdp->cpu, rcu_nocb_mask)) + return; + rdp->nxttail[RCU_NEXT_TAIL] = NULL; +} + +/* Initialize the ->call_remote fields in the rcu_state structures. */ +static void __init rcu_init_nocb(void) +{ +#ifdef CONFIG_PREEMPT_RCU + rcu_preempt_state.call_remote = call_rcu_preempt_remote; +#endif /* #ifdef CONFIG_PREEMPT_RCU */ + rcu_bh_state.call_remote = call_rcu_bh_remote; + rcu_sched_state.call_remote = call_rcu_sched_remote; +} + +#else /* #ifdef CONFIG_RCU_NOCB_CPU */ + +static bool is_nocb_cpu(int cpu) +{ + return false; +} + +static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp, + bool lazy) +{ + return 0; +} + +static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp, + struct rcu_data *rdp) +{ + return 0; +} + +static bool nocb_cpu_expendable(int cpu) +{ + return 1; +} + +static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp) +{ +} + +static void __init rcu_spawn_nocb_kthreads(struct rcu_state *rsp) +{ +} + +static void init_nocb_callback_list(struct rcu_data *rdp) +{ +} + +static void __init rcu_init_nocb(void) +{ +} + +#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */ diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c index 7340efd..4ce0f91 100644 --- a/kernel/rcutree_trace.c +++ b/kernel/rcutree_trace.c @@ -84,6 +84,8 @@ static char convert_kthread_status(unsigned int kthread_status) static void print_one_rcu_data(struct seq_file *m, struct rcu_data *rdp) { + long ql, qll; + if (!rdp->beenonline) return; seq_printf(m, "%3d%cc=%lu g=%lu pq=%d qp=%d", @@ -97,8 +99,11 @@ static void print_one_rcu_data(struct seq_file *m, struct rcu_data *rdp) rdp->dynticks->dynticks_nmi_nesting, rdp->dynticks_fqs); seq_printf(m, " of=%lu", rdp->offline_fqs); + rcu_nocb_q_lengths(rdp, &ql, &qll); + qll += rdp->qlen_lazy; + ql += rdp->qlen; seq_printf(m, " ql=%ld/%ld qs=%c%c%c%c", - rdp->qlen_lazy, rdp->qlen, + qll, ql, ".N"[rdp->nxttail[RCU_NEXT_READY_TAIL] != rdp->nxttail[RCU_NEXT_TAIL]], ".R"[rdp->nxttail[RCU_WAIT_TAIL] != @@ -147,6 +152,8 @@ static const struct file_operations rcudata_fops = { static void print_one_rcu_data_csv(struct seq_file *m, struct rcu_data *rdp) { + long ql, qll; + if (!rdp->beenonline) return; seq_printf(m, "%d,%s,%lu,%lu,%d,%d", @@ -160,7 +167,10 @@ static void print_one_rcu_data_csv(struct seq_file *m, struct rcu_data *rdp) rdp->dynticks->dynticks_nmi_nesting, rdp->dynticks_fqs); seq_printf(m, ",%lu", rdp->offline_fqs); - seq_printf(m, ",%ld,%ld,\"%c%c%c%c\"", rdp->qlen_lazy, rdp->qlen, + rcu_nocb_q_lengths(rdp, &ql, &qll); + qll += rdp->qlen_lazy; + ql += rdp->qlen; + seq_printf(m, ",%ld,%ld,\"%c%c%c%c\"", qll, ql, ".N"[rdp->nxttail[RCU_NEXT_READY_TAIL] != rdp->nxttail[RCU_NEXT_TAIL]], ".R"[rdp->nxttail[RCU_WAIT_TAIL] !=