@@ -31,6 +31,7 @@ typedef enum QmpCommandOptions
typedef struct QmpCommand
{
const char *name;
+ /* Runs in coroutine context if QCO_COROUTINE is set */
QmpCommandFunc *fn;
QmpCommandOptions options;
QTAILQ_ENTRY(QmpCommand) node;
@@ -155,7 +155,9 @@ static inline bool monitor_is_qmp(const Monitor *mon)
typedef QTAILQ_HEAD(MonitorList, Monitor) MonitorList;
extern IOThread *mon_iothread;
-extern QEMUBH *qmp_dispatcher_bh;
+extern Coroutine *qmp_dispatcher_co;
+extern bool qmp_dispatcher_co_shutdown;
+extern bool qmp_dispatcher_co_busy;
extern QmpCommandList qmp_commands, qmp_cap_negotiation_commands;
extern QemuMutex monitor_lock;
extern MonitorList mon_list;
@@ -173,7 +175,7 @@ void monitor_fdsets_cleanup(void);
void qmp_send_response(MonitorQMP *mon, const QDict *rsp);
void monitor_data_destroy_qmp(MonitorQMP *mon);
-void monitor_qmp_bh_dispatcher(void *data);
+void coroutine_fn monitor_qmp_dispatcher_co(void *data);
int get_monitor_def(int64_t *pval, const char *name);
void help_cmd(Monitor *mon, const char *name);
@@ -55,8 +55,32 @@ typedef struct {
/* Shared monitor I/O thread */
IOThread *mon_iothread;
-/* Bottom half to dispatch the requests received from I/O thread */
-QEMUBH *qmp_dispatcher_bh;
+/* Coroutine to dispatch the requests received from I/O thread */
+Coroutine *qmp_dispatcher_co;
+
+/* Set to true when the dispatcher coroutine should terminate */
+bool qmp_dispatcher_co_shutdown;
+
+/*
+ * qmp_dispatcher_co_busy is used for synchronisation between the
+ * monitor thread and the main thread to ensure that the dispatcher
+ * coroutine never gets scheduled a second time when it's already
+ * scheduled (scheduling the same coroutine twice is forbidden).
+ *
+ * It is true if the coroutine is active and processing requests.
+ * Additional requests may then be pushed onto mon->qmp_requests,
+ * and @qmp_dispatcher_co_shutdown may be set without further ado.
+ * @qmp_dispatcher_co_busy must not be woken up in this case.
+ *
+ * If false, you also have to set @qmp_dispatcher_co_busy to true and
+ * wake up @qmp_dispatcher_co after pushing the new requests.
+ *
+ * The coroutine will automatically change this variable back to false
+ * before it yields. Nobody else may set the variable to false.
+ *
+ * Access must be atomic for thread safety.
+ */
+bool qmp_dispatcher_co_busy;
/*
* Protects mon_list, monitor_qapi_event_state, coroutine_mon,
@@ -623,9 +647,24 @@ void monitor_cleanup(void)
}
qemu_mutex_unlock(&monitor_lock);
- /* QEMUBHs needs to be deleted before destroying the I/O thread */
- qemu_bh_delete(qmp_dispatcher_bh);
- qmp_dispatcher_bh = NULL;
+ /*
+ * The dispatcher needs to stop before destroying the I/O thread.
+ *
+ * We need to poll both qemu_aio_context and iohandler_ctx to make
+ * sure that the dispatcher coroutine keeps making progress and
+ * eventually terminates. qemu_aio_context is automatically
+ * polled by calling AIO_WAIT_WHILE on it, but we must poll
+ * iohandler_ctx manually.
+ */
+ qmp_dispatcher_co_shutdown = true;
+ if (!atomic_xchg(&qmp_dispatcher_co_busy, true)) {
+ aio_co_wake(qmp_dispatcher_co);
+ }
+
+ AIO_WAIT_WHILE(qemu_get_aio_context(),
+ (aio_poll(iohandler_get_aio_context(), false),
+ atomic_mb_read(&qmp_dispatcher_co_busy)));
+
if (mon_iothread) {
iothread_destroy(mon_iothread);
mon_iothread = NULL;
@@ -649,9 +688,9 @@ void monitor_init_globals_core(void)
* have commands assuming that context. It would be nice to get
* rid of those assumptions.
*/
- qmp_dispatcher_bh = aio_bh_new(iohandler_get_aio_context(),
- monitor_qmp_bh_dispatcher,
- NULL);
+ qmp_dispatcher_co = qemu_coroutine_create(monitor_qmp_dispatcher_co, NULL);
+ atomic_mb_set(&qmp_dispatcher_co_busy, true);
+ aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
}
int monitor_init(MonitorOptions *opts, bool allow_hmp, Error **errp)
@@ -133,6 +133,10 @@ static void monitor_qmp_respond(MonitorQMP *mon, QDict *rsp)
}
}
+/*
+ * Runs outside of coroutine context for OOB commands, but in
+ * coroutine context for everything else.
+ */
static void monitor_qmp_dispatch(MonitorQMP *mon, QObject *req)
{
QDict *rsp;
@@ -205,43 +209,99 @@ static QMPRequest *monitor_qmp_requests_pop_any_with_lock(void)
return req_obj;
}
-void monitor_qmp_bh_dispatcher(void *data)
+void coroutine_fn monitor_qmp_dispatcher_co(void *data)
{
- QMPRequest *req_obj = monitor_qmp_requests_pop_any_with_lock();
+ QMPRequest *req_obj = NULL;
QDict *rsp;
bool need_resume;
MonitorQMP *mon;
- if (!req_obj) {
- return;
- }
+ while (true) {
+ assert(atomic_mb_read(&qmp_dispatcher_co_busy) == true);
- mon = req_obj->mon;
- /* qmp_oob_enabled() might change after "qmp_capabilities" */
- need_resume = !qmp_oob_enabled(mon) ||
- mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;
- qemu_mutex_unlock(&mon->qmp_queue_lock);
- if (req_obj->req) {
- QDict *qdict = qobject_to(QDict, req_obj->req);
- QObject *id = qdict ? qdict_get(qdict, "id") : NULL;
- trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");
- monitor_qmp_dispatch(mon, req_obj->req);
- } else {
- assert(req_obj->err);
- rsp = qmp_error_response(req_obj->err);
- req_obj->err = NULL;
- monitor_qmp_respond(mon, rsp);
- qobject_unref(rsp);
- }
+ /*
+ * Mark the dispatcher as not busy already here so that we
+ * don't miss any new requests coming in the middle of our
+ * processing.
+ */
+ atomic_mb_set(&qmp_dispatcher_co_busy, false);
+
+ while (!(req_obj = monitor_qmp_requests_pop_any_with_lock())) {
+ /*
+ * No more requests to process. Wait to be reentered from
+ * handle_qmp_command() when it pushes more requests, or
+ * from monitor_cleanup() when it requests shutdown.
+ */
+ if (!qmp_dispatcher_co_shutdown) {
+ qemu_coroutine_yield();
+
+ /*
+ * busy must be set to true again by whoever
+ * rescheduled us to avoid double scheduling
+ */
+ assert(atomic_xchg(&qmp_dispatcher_co_busy, false) == true);
+ }
+
+ /*
+ * qmp_dispatcher_co_shutdown may have changed if we
+ * yielded and were reentered from monitor_cleanup()
+ */
+ if (qmp_dispatcher_co_shutdown) {
+ return;
+ }
+ }
- if (need_resume) {
- /* Pairs with the monitor_suspend() in handle_qmp_command() */
- monitor_resume(&mon->common);
- }
- qmp_request_free(req_obj);
+ if (atomic_xchg(&qmp_dispatcher_co_busy, true) == true) {
+ /*
+ * Someone rescheduled us (probably because a new requests
+ * came in), but we didn't actually yield. Do that now,
+ * only to be immediately reentered and removed from the
+ * list of scheduled coroutines.
+ */
+ qemu_coroutine_yield();
+ }
- /* Reschedule instead of looping so the main loop stays responsive */
- qemu_bh_schedule(qmp_dispatcher_bh);
+ /*
+ * Move the coroutine from iohandler_ctx to qemu_aio_context for
+ * executing the command handler so that it can make progress if it
+ * involves an AIO_WAIT_WHILE().
+ */
+ aio_co_schedule(qemu_get_aio_context(), qmp_dispatcher_co);
+ qemu_coroutine_yield();
+
+ mon = req_obj->mon;
+ /* qmp_oob_enabled() might change after "qmp_capabilities" */
+ need_resume = !qmp_oob_enabled(mon) ||
+ mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;
+ qemu_mutex_unlock(&mon->qmp_queue_lock);
+ if (req_obj->req) {
+ QDict *qdict = qobject_to(QDict, req_obj->req);
+ QObject *id = qdict ? qdict_get(qdict, "id") : NULL;
+ trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");
+ monitor_qmp_dispatch(mon, req_obj->req);
+ } else {
+ assert(req_obj->err);
+ rsp = qmp_error_response(req_obj->err);
+ req_obj->err = NULL;
+ monitor_qmp_respond(mon, rsp);
+ qobject_unref(rsp);
+ }
+
+ if (need_resume) {
+ /* Pairs with the monitor_suspend() in handle_qmp_command() */
+ monitor_resume(&mon->common);
+ }
+ qmp_request_free(req_obj);
+
+ /*
+ * Yield and reschedule so the main loop stays responsive.
+ *
+ * Move back to iohandler_ctx so that nested event loops for
+ * qemu_aio_context don't start new monitor commands.
+ */
+ aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
+ qemu_coroutine_yield();
+ }
}
static void handle_qmp_command(void *opaque, QObject *req, Error *err)
@@ -302,7 +362,9 @@ static void handle_qmp_command(void *opaque, QObject *req, Error *err)
qemu_mutex_unlock(&mon->qmp_queue_lock);
/* Kick the dispatcher routine */
- qemu_bh_schedule(qmp_dispatcher_bh);
+ if (!atomic_xchg(&qmp_dispatcher_co_busy, true)) {
+ aio_co_wake(qmp_dispatcher_co);
+ }
}
static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size)
@@ -12,12 +12,16 @@
*/
#include "qemu/osdep.h"
+
+#include "block/aio.h"
#include "qapi/error.h"
#include "qapi/qmp/dispatch.h"
#include "qapi/qmp/qdict.h"
#include "qapi/qmp/qjson.h"
#include "sysemu/runstate.h"
#include "qapi/qmp/qbool.h"
+#include "qemu/coroutine.h"
+#include "qemu/main-loop.h"
static QDict *qmp_dispatch_check_obj(QDict *dict, bool allow_oob,
Error **errp)
@@ -88,6 +92,30 @@ bool qmp_is_oob(const QDict *dict)
&& !qdict_haskey(dict, "execute");
}
+typedef struct QmpDispatchBH {
+ const QmpCommand *cmd;
+ Monitor *cur_mon;
+ QDict *args;
+ QObject **ret;
+ Error **errp;
+ Coroutine *co;
+} QmpDispatchBH;
+
+static void do_qmp_dispatch_bh(void *opaque)
+{
+ QmpDispatchBH *data = opaque;
+
+ assert(monitor_cur() == NULL);
+ monitor_set_cur(qemu_coroutine_self(), data->cur_mon);
+ data->cmd->fn(data->args, data->ret, data->errp);
+ monitor_set_cur(qemu_coroutine_self(), NULL);
+ aio_co_wake(data->co);
+}
+
+/*
+ * Runs outside of coroutine context for OOB commands, but in coroutine
+ * context for everything else.
+ */
QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request,
bool allow_oob, Monitor *cur_mon)
{
@@ -153,12 +181,35 @@ QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request,
qobject_ref(args);
}
+ assert(!(oob && qemu_in_coroutine()));
assert(monitor_cur() == NULL);
- monitor_set_cur(qemu_coroutine_self(), cur_mon);
-
- cmd->fn(args, &ret, &err);
-
- monitor_set_cur(qemu_coroutine_self(), NULL);
+ if (!!(cmd->options & QCO_COROUTINE) == qemu_in_coroutine()) {
+ monitor_set_cur(qemu_coroutine_self(), cur_mon);
+ cmd->fn(args, &ret, &err);
+ monitor_set_cur(qemu_coroutine_self(), NULL);
+ } else {
+ /*
+ * Not being in coroutine context implies that we're handling
+ * an OOB command, which must not have QCO_COROUTINE.
+ *
+ * This implies that we are in coroutine context, but the
+ * command doesn't have QCO_COROUTINE. We must drop out of
+ * coroutine context for this one.
+ */
+ assert(!oob && qemu_in_coroutine() && !(cmd->options & QCO_COROUTINE));
+
+ QmpDispatchBH data = {
+ .cur_mon = cur_mon,
+ .cmd = cmd,
+ .args = args,
+ .ret = &ret,
+ .errp = &err,
+ .co = qemu_coroutine_self(),
+ };
+ aio_bh_schedule_oneshot(qemu_get_aio_context(), do_qmp_dispatch_bh,
+ &data);
+ qemu_coroutine_yield();
+ }
qobject_unref(args);
if (err) {
/* or assert(!ret) after reviewing all handlers: */
@@ -20,6 +20,9 @@ void qmp_register_command(QmpCommandList *cmds, const char *name,
{
QmpCommand *cmd = g_malloc0(sizeof(*cmd));
+ /* QCO_COROUTINE and QCO_ALLOW_OOB are incompatible for now */
+ assert(!((options & QCO_COROUTINE) && (options & QCO_ALLOW_OOB)));
+
cmd->name = name;
cmd->fn = fn;
cmd->enabled = true;
@@ -15,6 +15,7 @@
#include "qemu/osdep.h"
#include "block/block.h"
+#include "qemu/main-loop.h"
#include "qemu/rcu.h"
#include "qemu/rcu_queue.h"
#include "qemu/sockets.h"
@@ -558,8 +559,13 @@ bool aio_poll(AioContext *ctx, bool blocking)
* There cannot be two concurrent aio_poll calls for the same AioContext (or
* an aio_poll concurrent with a GSource prepare/check/dispatch callback).
* We rely on this below to avoid slow locked accesses to ctx->notify_me.
+ *
+ * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
+ * is special in that it runs in the main thread, but that thread's context
+ * is qemu_aio_context.
*/
- assert(in_aio_context_home_thread(ctx));
+ assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
+ qemu_get_aio_context() : ctx));
qemu_lockcnt_inc(&ctx->list_lock);