Message ID | 20200909151149.490589-1-kwolf@redhat.com |
---|---|
Headers | show |
Series | monitor: Optionally run handlers in coroutines | expand |
Patchew URL: https://patchew.org/QEMU/20200909151149.490589-1-kwolf@redhat.com/ Hi, This series failed build test on FreeBSD host. Please find the details below. === TEST SCRIPT BEGIN === #!/bin/bash # Testing script will be invoked under the git checkout with # HEAD pointing to a commit that has the patches applied on top of "base" # branch if qemu-system-x86_64 --help >/dev/null 2>&1; then QEMU=qemu-system-x86_64 elif /usr/libexec/qemu-kvm --help >/dev/null 2>&1; then QEMU=/usr/libexec/qemu-kvm else exit 1 fi make vm-build-freebsd J=21 QEMU=$QEMU exit 0 === TEST SCRIPT END === The full log is available at http://patchew.org/logs/20200909151149.490589-1-kwolf@redhat.com/testing.FreeBSD/?type=message. --- Email generated automatically by Patchew [https://patchew.org/]. Please send your feedback to patchew-devel@redhat.com
Stefan Hajnoczi <stefanha@redhat.com> writes: > On Wed, Sep 09, 2020 at 05:11:36PM +0200, Kevin Wolf wrote: >> Some QMP command handlers can block the main loop for a relatively long >> time, for example because they perform some I/O. This is quite nasty. >> Allowing such handlers to run in a coroutine where they can yield (and >> therefore release the BQL) while waiting for an event such as I/O >> completion solves the problem. >> >> This series adds the infrastructure to allow this and switches >> block_resize to run in a coroutine as a first example. >> >> This is an alternative solution to Marc-André's "monitor: add >> asynchronous command type" series. > > Please clarify the following in the QAPI documentation: > * Is the QMP monitor suspended while the command is pending? > * Are QMP events reported while the command is pending? Good points. Kevin, I'd be willing to take this as a follow-up patch, if that's more convenient for you. > Acked-by: Stefan Hajnoczi <stefanha@redhat.com> Stefan, I could use your proper review of PATCH 11-13. Pretty-please?
Kevin Wolf <kwolf@redhat.com> writes: > This moves the QMP dispatcher to a coroutine and runs all QMP command > handlers that declare 'coroutine': true in coroutine context so they > can avoid blocking the main loop while doing I/O or waiting for other > events. > > For commands that are not declared safe to run in a coroutine, the > dispatcher drops out of coroutine context by calling the QMP command > handler from a bottom half. > > Signed-off-by: Kevin Wolf <kwolf@redhat.com> > Reviewed-by: Markus Armbruster <armbru@redhat.com> > --- > include/qapi/qmp/dispatch.h | 1 + > monitor/monitor-internal.h | 6 +- > monitor/monitor.c | 55 +++++++++++++--- > monitor/qmp.c | 122 +++++++++++++++++++++++++++--------- > qapi/qmp-dispatch.c | 61 ++++++++++++++++-- > qapi/qmp-registry.c | 3 + > util/aio-posix.c | 8 ++- > 7 files changed, 210 insertions(+), 46 deletions(-) > > diff --git a/include/qapi/qmp/dispatch.h b/include/qapi/qmp/dispatch.h > index 9fd2b720a7..af8d96c570 100644 > --- a/include/qapi/qmp/dispatch.h > +++ b/include/qapi/qmp/dispatch.h > @@ -31,6 +31,7 @@ typedef enum QmpCommandOptions > typedef struct QmpCommand > { > const char *name; > + /* Runs in coroutine context if QCO_COROUTINE is set */ > QmpCommandFunc *fn; > QmpCommandOptions options; > QTAILQ_ENTRY(QmpCommand) node; > diff --git a/monitor/monitor-internal.h b/monitor/monitor-internal.h > index b39e03b744..b55d6df07f 100644 > --- a/monitor/monitor-internal.h > +++ b/monitor/monitor-internal.h > @@ -155,7 +155,9 @@ static inline bool monitor_is_qmp(const Monitor *mon) > > typedef QTAILQ_HEAD(MonitorList, Monitor) MonitorList; > extern IOThread *mon_iothread; > -extern QEMUBH *qmp_dispatcher_bh; > +extern Coroutine *qmp_dispatcher_co; > +extern bool qmp_dispatcher_co_shutdown; > +extern bool qmp_dispatcher_co_busy; > extern QmpCommandList qmp_commands, qmp_cap_negotiation_commands; > extern QemuMutex monitor_lock; > extern MonitorList mon_list; > @@ -173,7 +175,7 @@ void monitor_fdsets_cleanup(void); > > void qmp_send_response(MonitorQMP *mon, const QDict *rsp); > void monitor_data_destroy_qmp(MonitorQMP *mon); > -void monitor_qmp_bh_dispatcher(void *data); > +void coroutine_fn monitor_qmp_dispatcher_co(void *data); > > int get_monitor_def(int64_t *pval, const char *name); > void help_cmd(Monitor *mon, const char *name); > diff --git a/monitor/monitor.c b/monitor/monitor.c > index 629aa073ee..ac2722bf91 100644 > --- a/monitor/monitor.c > +++ b/monitor/monitor.c > @@ -55,8 +55,32 @@ typedef struct { > /* Shared monitor I/O thread */ > IOThread *mon_iothread; > > -/* Bottom half to dispatch the requests received from I/O thread */ > -QEMUBH *qmp_dispatcher_bh; > +/* Coroutine to dispatch the requests received from I/O thread */ > +Coroutine *qmp_dispatcher_co; > + > +/* Set to true when the dispatcher coroutine should terminate */ > +bool qmp_dispatcher_co_shutdown; > + > +/* > + * qmp_dispatcher_co_busy is used for synchronisation between the > + * monitor thread and the main thread to ensure that the dispatcher > + * coroutine never gets scheduled a second time when it's already > + * scheduled (scheduling the same coroutine twice is forbidden). > + * > + * It is true if the coroutine is active and processing requests. > + * Additional requests may then be pushed onto mon->qmp_requests, > + * and @qmp_dispatcher_co_shutdown may be set without further ado. > + * @qmp_dispatcher_co_busy must not be woken up in this case. > + * > + * If false, you also have to set @qmp_dispatcher_co_busy to true and > + * wake up @qmp_dispatcher_co after pushing the new requests. > + * > + * The coroutine will automatically change this variable back to false > + * before it yields. Nobody else may set the variable to false. > + * > + * Access must be atomic for thread safety. > + */ > +bool qmp_dispatcher_co_busy; > > /* > * Protects mon_list, monitor_qapi_event_state, coroutine_mon, > @@ -623,9 +647,24 @@ void monitor_cleanup(void) > } > qemu_mutex_unlock(&monitor_lock); > > - /* QEMUBHs needs to be deleted before destroying the I/O thread */ > - qemu_bh_delete(qmp_dispatcher_bh); > - qmp_dispatcher_bh = NULL; > + /* > + * The dispatcher needs to stop before destroying the I/O thread. > + * > + * We need to poll both qemu_aio_context and iohandler_ctx to make > + * sure that the dispatcher coroutine keeps making progress and > + * eventually terminates. qemu_aio_context is automatically > + * polled by calling AIO_WAIT_WHILE on it, but we must poll > + * iohandler_ctx manually. > + */ > + qmp_dispatcher_co_shutdown = true; > + if (!atomic_xchg(&qmp_dispatcher_co_busy, true)) { > + aio_co_wake(qmp_dispatcher_co); > + } > + > + AIO_WAIT_WHILE(qemu_get_aio_context(), > + (aio_poll(iohandler_get_aio_context(), false), > + atomic_mb_read(&qmp_dispatcher_co_busy))); > + > if (mon_iothread) { > iothread_destroy(mon_iothread); > mon_iothread = NULL; > @@ -649,9 +688,9 @@ void monitor_init_globals_core(void) > * have commands assuming that context. It would be nice to get > * rid of those assumptions. > */ > - qmp_dispatcher_bh = aio_bh_new(iohandler_get_aio_context(), > - monitor_qmp_bh_dispatcher, > - NULL); > + qmp_dispatcher_co = qemu_coroutine_create(monitor_qmp_dispatcher_co, NULL); > + atomic_mb_set(&qmp_dispatcher_co_busy, true); > + aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co); > } > > int monitor_init(MonitorOptions *opts, bool allow_hmp, Error **errp) > diff --git a/monitor/qmp.c b/monitor/qmp.c > index 922fdb5541..69f6e93f38 100644 > --- a/monitor/qmp.c > +++ b/monitor/qmp.c > @@ -133,6 +133,10 @@ static void monitor_qmp_respond(MonitorQMP *mon, QDict *rsp) > } > } > > +/* > + * Runs outside of coroutine context for OOB commands, but in > + * coroutine context for everything else. > + */ > static void monitor_qmp_dispatch(MonitorQMP *mon, QObject *req) > { > QDict *rsp; > @@ -205,43 +209,99 @@ static QMPRequest *monitor_qmp_requests_pop_any_with_lock(void) > return req_obj; > } > > -void monitor_qmp_bh_dispatcher(void *data) > +void coroutine_fn monitor_qmp_dispatcher_co(void *data) > { > - QMPRequest *req_obj = monitor_qmp_requests_pop_any_with_lock(); > + QMPRequest *req_obj = NULL; > QDict *rsp; > bool need_resume; > MonitorQMP *mon; > > - if (!req_obj) { > - return; > - } > + while (true) { > + assert(atomic_mb_read(&qmp_dispatcher_co_busy) == true); > > - mon = req_obj->mon; > - /* qmp_oob_enabled() might change after "qmp_capabilities" */ > - need_resume = !qmp_oob_enabled(mon) || > - mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1; > - qemu_mutex_unlock(&mon->qmp_queue_lock); > - if (req_obj->req) { > - QDict *qdict = qobject_to(QDict, req_obj->req); > - QObject *id = qdict ? qdict_get(qdict, "id") : NULL; > - trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: ""); > - monitor_qmp_dispatch(mon, req_obj->req); > - } else { > - assert(req_obj->err); > - rsp = qmp_error_response(req_obj->err); > - req_obj->err = NULL; > - monitor_qmp_respond(mon, rsp); > - qobject_unref(rsp); > - } > + /* > + * Mark the dispatcher as not busy already here so that we > + * don't miss any new requests coming in the middle of our > + * processing. > + */ > + atomic_mb_set(&qmp_dispatcher_co_busy, false); > + > + while (!(req_obj = monitor_qmp_requests_pop_any_with_lock())) { > + /* > + * No more requests to process. Wait to be reentered from > + * handle_qmp_command() when it pushes more requests, or > + * from monitor_cleanup() when it requests shutdown. > + */ > + if (!qmp_dispatcher_co_shutdown) { > + qemu_coroutine_yield(); > + > + /* > + * busy must be set to true again by whoever > + * rescheduled us to avoid double scheduling > + */ > + assert(atomic_xchg(&qmp_dispatcher_co_busy, false) == true); > + } > + > + /* > + * qmp_dispatcher_co_shutdown may have changed if we > + * yielded and were reentered from monitor_cleanup() > + */ > + if (qmp_dispatcher_co_shutdown) { > + return; > + } > + } > > - if (need_resume) { > - /* Pairs with the monitor_suspend() in handle_qmp_command() */ > - monitor_resume(&mon->common); > - } > - qmp_request_free(req_obj); > + if (atomic_xchg(&qmp_dispatcher_co_busy, true) == true) { > + /* > + * Someone rescheduled us (probably because a new requests > + * came in), but we didn't actually yield. Do that now, > + * only to be immediately reentered and removed from the > + * list of scheduled coroutines. > + */ > + qemu_coroutine_yield(); > + } > > - /* Reschedule instead of looping so the main loop stays responsive */ > - qemu_bh_schedule(qmp_dispatcher_bh); > + /* > + * Move the coroutine from iohandler_ctx to qemu_aio_context for > + * executing the command handler so that it can make progress if it > + * involves an AIO_WAIT_WHILE(). > + */ > + aio_co_schedule(qemu_get_aio_context(), qmp_dispatcher_co); > + qemu_coroutine_yield(); > + > + mon = req_obj->mon; > + /* qmp_oob_enabled() might change after "qmp_capabilities" */ > + need_resume = !qmp_oob_enabled(mon) || > + mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1; > + qemu_mutex_unlock(&mon->qmp_queue_lock); > + if (req_obj->req) { > + QDict *qdict = qobject_to(QDict, req_obj->req); > + QObject *id = qdict ? qdict_get(qdict, "id") : NULL; > + trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: ""); > + monitor_qmp_dispatch(mon, req_obj->req); > + } else { > + assert(req_obj->err); > + rsp = qmp_error_response(req_obj->err); > + req_obj->err = NULL; > + monitor_qmp_respond(mon, rsp); > + qobject_unref(rsp); > + } > + > + if (need_resume) { > + /* Pairs with the monitor_suspend() in handle_qmp_command() */ > + monitor_resume(&mon->common); > + } > + qmp_request_free(req_obj); > + > + /* > + * Yield and reschedule so the main loop stays responsive. > + * > + * Move back to iohandler_ctx so that nested event loops for > + * qemu_aio_context don't start new monitor commands. > + */ > + aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co); > + qemu_coroutine_yield(); > + } > } > > static void handle_qmp_command(void *opaque, QObject *req, Error *err) > @@ -302,7 +362,9 @@ static void handle_qmp_command(void *opaque, QObject *req, Error *err) > qemu_mutex_unlock(&mon->qmp_queue_lock); > > /* Kick the dispatcher routine */ > - qemu_bh_schedule(qmp_dispatcher_bh); > + if (!atomic_xchg(&qmp_dispatcher_co_busy, true)) { > + aio_co_wake(qmp_dispatcher_co); > + } > } > > static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size) > diff --git a/qapi/qmp-dispatch.c b/qapi/qmp-dispatch.c > index 5677ba92ca..754f7b854c 100644 > --- a/qapi/qmp-dispatch.c > +++ b/qapi/qmp-dispatch.c > @@ -12,12 +12,16 @@ > */ > > #include "qemu/osdep.h" > + > +#include "block/aio.h" > #include "qapi/error.h" > #include "qapi/qmp/dispatch.h" > #include "qapi/qmp/qdict.h" > #include "qapi/qmp/qjson.h" > #include "sysemu/runstate.h" > #include "qapi/qmp/qbool.h" > +#include "qemu/coroutine.h" > +#include "qemu/main-loop.h" > > static QDict *qmp_dispatch_check_obj(QDict *dict, bool allow_oob, > Error **errp) > @@ -88,6 +92,30 @@ bool qmp_is_oob(const QDict *dict) > && !qdict_haskey(dict, "execute"); > } > > +typedef struct QmpDispatchBH { > + const QmpCommand *cmd; > + Monitor *cur_mon; > + QDict *args; > + QObject **ret; > + Error **errp; > + Coroutine *co; > +} QmpDispatchBH; > + > +static void do_qmp_dispatch_bh(void *opaque) > +{ > + QmpDispatchBH *data = opaque; > + > + assert(monitor_cur() == NULL); > + monitor_set_cur(qemu_coroutine_self(), data->cur_mon); > + data->cmd->fn(data->args, data->ret, data->errp); > + monitor_set_cur(qemu_coroutine_self(), NULL); > + aio_co_wake(data->co); > +} > + > +/* > + * Runs outside of coroutine context for OOB commands, but in coroutine > + * context for everything else. > + */ > QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request, > bool allow_oob, Monitor *cur_mon) > { > @@ -153,12 +181,35 @@ QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request, > qobject_ref(args); > } > > + assert(!(oob && qemu_in_coroutine())); > assert(monitor_cur() == NULL); > - monitor_set_cur(qemu_coroutine_self(), cur_mon); > - > - cmd->fn(args, &ret, &err); > - > - monitor_set_cur(qemu_coroutine_self(), NULL); > + if (!!(cmd->options & QCO_COROUTINE) == qemu_in_coroutine()) { > + monitor_set_cur(qemu_coroutine_self(), cur_mon); > + cmd->fn(args, &ret, &err); > + monitor_set_cur(qemu_coroutine_self(), NULL); > + } else { > + /* > + * Not being in coroutine context implies that we're handling > + * an OOB command, which must not have QCO_COROUTINE. > + * > + * This implies that we are in coroutine context, but the > + * command doesn't have QCO_COROUTINE. We must drop out of > + * coroutine context for this one. > + */ I had to read this several times to get it. The first sentence leads me into coroutine context, and then the next sentence tells me the opposite, throwing me into confusion. Perhaps something like this: /* * Actual context doesn't match the one the command needs. * Case 1: we are in coroutine context, but command does not * have QCO_COROUTINE. We need to drop out of coroutine * context for executing it. * Case 2: we are outside coroutine context, but command has * QCO_COROUTINE. Can't actually happen, because we get here * outside coroutine context only when executing a command * out of band, and OOB commands never have QCO_COROUTINE. */ > + assert(!oob && qemu_in_coroutine() && !(cmd->options & QCO_COROUTINE)); > + > + QmpDispatchBH data = { > + .cur_mon = cur_mon, > + .cmd = cmd, > + .args = args, > + .ret = &ret, > + .errp = &err, > + .co = qemu_coroutine_self(), > + }; > + aio_bh_schedule_oneshot(qemu_get_aio_context(), do_qmp_dispatch_bh, > + &data); > + qemu_coroutine_yield(); > + } > qobject_unref(args); > if (err) { > /* or assert(!ret) after reviewing all handlers: */ > diff --git a/qapi/qmp-registry.c b/qapi/qmp-registry.c > index d0f9a1d3e3..58c65b5052 100644 > --- a/qapi/qmp-registry.c > +++ b/qapi/qmp-registry.c > @@ -20,6 +20,9 @@ void qmp_register_command(QmpCommandList *cmds, const char *name, > { > QmpCommand *cmd = g_malloc0(sizeof(*cmd)); > > + /* QCO_COROUTINE and QCO_ALLOW_OOB are incompatible for now */ > + assert(!((options & QCO_COROUTINE) && (options & QCO_ALLOW_OOB))); > + > cmd->name = name; > cmd->fn = fn; > cmd->enabled = true; > diff --git a/util/aio-posix.c b/util/aio-posix.c > index f7f13ebfc2..30bb21d699 100644 > --- a/util/aio-posix.c > +++ b/util/aio-posix.c > @@ -15,6 +15,7 @@ > > #include "qemu/osdep.h" > #include "block/block.h" > +#include "qemu/main-loop.h" > #include "qemu/rcu.h" > #include "qemu/rcu_queue.h" > #include "qemu/sockets.h" > @@ -558,8 +559,13 @@ bool aio_poll(AioContext *ctx, bool blocking) > * There cannot be two concurrent aio_poll calls for the same AioContext (or > * an aio_poll concurrent with a GSource prepare/check/dispatch callback). > * We rely on this below to avoid slow locked accesses to ctx->notify_me. > + * > + * aio_poll() may only be called in the AioContext's thread. iohandler_ctx > + * is special in that it runs in the main thread, but that thread's context > + * is qemu_aio_context. > */ > - assert(in_aio_context_home_thread(ctx)); > + assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ? > + qemu_get_aio_context() : ctx)); > > qemu_lockcnt_inc(&ctx->list_lock);
On Wed, Sep 09, 2020 at 05:11:48PM +0200, Kevin Wolf wrote: > Add a function to move the current coroutine to the AioContext of a > given BlockDriverState. > > Signed-off-by: Kevin Wolf <kwolf@redhat.com> > --- > include/block/block.h | 6 ++++++ > block.c | 10 ++++++++++ > 2 files changed, 16 insertions(+) > > diff --git a/include/block/block.h b/include/block/block.h > index 981ab5b314..80ab322f11 100644 > --- a/include/block/block.h > +++ b/include/block/block.h > @@ -626,6 +626,12 @@ bool bdrv_debug_is_suspended(BlockDriverState *bs, const char *tag); > */ > AioContext *bdrv_get_aio_context(BlockDriverState *bs); > > +/** > + * Move the current coroutine to the AioContext of @bs and return the old > + * AioContext of the coroutine. > + */ > +AioContext *coroutine_fn bdrv_co_move_to_aio_context(BlockDriverState *bs); I'm not sure this function handles all cases: 1. Being called without the BQL (i.e. from an IOThread). 2. Being called while a device stops using its IOThread. The races that come to mind are fetching the AioContext for bs and then scheduling a BH. The BH is executed later on by the event loop. There might be cases where the AioContext for bs is updated before the BH runs. I didn't investigate these cases but wanted to mention them in case you want to add doc comments about when this function can be used or if you'd like to verify them yourself. Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
On Mon, Sep 14, 2020 at 05:09:49PM +0200, Markus Armbruster wrote: > Stefan Hajnoczi <stefanha@redhat.com> writes: > > > On Wed, Sep 09, 2020 at 05:11:36PM +0200, Kevin Wolf wrote: > >> Some QMP command handlers can block the main loop for a relatively long > >> time, for example because they perform some I/O. This is quite nasty. > >> Allowing such handlers to run in a coroutine where they can yield (and > >> therefore release the BQL) while waiting for an event such as I/O > >> completion solves the problem. > >> > >> This series adds the infrastructure to allow this and switches > >> block_resize to run in a coroutine as a first example. > >> > >> This is an alternative solution to Marc-André's "monitor: add > >> asynchronous command type" series. > > > > Please clarify the following in the QAPI documentation: > > * Is the QMP monitor suspended while the command is pending? > > * Are QMP events reported while the command is pending? > > Good points. Kevin, I'd be willing to take this as a follow-up patch, > if that's more convenient for you. > > > Acked-by: Stefan Hajnoczi <stefanha@redhat.com> > > Stefan, I could use your proper review of PATCH 11-13. Pretty-please? Sounds good. I have reviewed the patches 11-13 and left questions for Kevin.
Am 14.09.2020 um 17:30 hat Markus Armbruster geschrieben: > Kevin Wolf <kwolf@redhat.com> writes: > > > This moves the QMP dispatcher to a coroutine and runs all QMP command > > handlers that declare 'coroutine': true in coroutine context so they > > can avoid blocking the main loop while doing I/O or waiting for other > > events. > > > > For commands that are not declared safe to run in a coroutine, the > > dispatcher drops out of coroutine context by calling the QMP command > > handler from a bottom half. > > > > Signed-off-by: Kevin Wolf <kwolf@redhat.com> > > Reviewed-by: Markus Armbruster <armbru@redhat.com> > > --- > > include/qapi/qmp/dispatch.h | 1 + > > monitor/monitor-internal.h | 6 +- > > monitor/monitor.c | 55 +++++++++++++--- > > monitor/qmp.c | 122 +++++++++++++++++++++++++++--------- > > qapi/qmp-dispatch.c | 61 ++++++++++++++++-- > > qapi/qmp-registry.c | 3 + > > util/aio-posix.c | 8 ++- > > 7 files changed, 210 insertions(+), 46 deletions(-) > > > > diff --git a/include/qapi/qmp/dispatch.h b/include/qapi/qmp/dispatch.h > > index 9fd2b720a7..af8d96c570 100644 > > --- a/include/qapi/qmp/dispatch.h > > +++ b/include/qapi/qmp/dispatch.h > > @@ -31,6 +31,7 @@ typedef enum QmpCommandOptions > > typedef struct QmpCommand > > { > > const char *name; > > + /* Runs in coroutine context if QCO_COROUTINE is set */ > > QmpCommandFunc *fn; > > QmpCommandOptions options; > > QTAILQ_ENTRY(QmpCommand) node; > > diff --git a/monitor/monitor-internal.h b/monitor/monitor-internal.h > > index b39e03b744..b55d6df07f 100644 > > --- a/monitor/monitor-internal.h > > +++ b/monitor/monitor-internal.h > > @@ -155,7 +155,9 @@ static inline bool monitor_is_qmp(const Monitor *mon) > > > > typedef QTAILQ_HEAD(MonitorList, Monitor) MonitorList; > > extern IOThread *mon_iothread; > > -extern QEMUBH *qmp_dispatcher_bh; > > +extern Coroutine *qmp_dispatcher_co; > > +extern bool qmp_dispatcher_co_shutdown; > > +extern bool qmp_dispatcher_co_busy; > > extern QmpCommandList qmp_commands, qmp_cap_negotiation_commands; > > extern QemuMutex monitor_lock; > > extern MonitorList mon_list; > > @@ -173,7 +175,7 @@ void monitor_fdsets_cleanup(void); > > > > void qmp_send_response(MonitorQMP *mon, const QDict *rsp); > > void monitor_data_destroy_qmp(MonitorQMP *mon); > > -void monitor_qmp_bh_dispatcher(void *data); > > +void coroutine_fn monitor_qmp_dispatcher_co(void *data); > > > > int get_monitor_def(int64_t *pval, const char *name); > > void help_cmd(Monitor *mon, const char *name); > > diff --git a/monitor/monitor.c b/monitor/monitor.c > > index 629aa073ee..ac2722bf91 100644 > > --- a/monitor/monitor.c > > +++ b/monitor/monitor.c > > @@ -55,8 +55,32 @@ typedef struct { > > /* Shared monitor I/O thread */ > > IOThread *mon_iothread; > > > > -/* Bottom half to dispatch the requests received from I/O thread */ > > -QEMUBH *qmp_dispatcher_bh; > > +/* Coroutine to dispatch the requests received from I/O thread */ > > +Coroutine *qmp_dispatcher_co; > > + > > +/* Set to true when the dispatcher coroutine should terminate */ > > +bool qmp_dispatcher_co_shutdown; > > + > > +/* > > + * qmp_dispatcher_co_busy is used for synchronisation between the > > + * monitor thread and the main thread to ensure that the dispatcher > > + * coroutine never gets scheduled a second time when it's already > > + * scheduled (scheduling the same coroutine twice is forbidden). > > + * > > + * It is true if the coroutine is active and processing requests. > > + * Additional requests may then be pushed onto mon->qmp_requests, > > + * and @qmp_dispatcher_co_shutdown may be set without further ado. > > + * @qmp_dispatcher_co_busy must not be woken up in this case. > > + * > > + * If false, you also have to set @qmp_dispatcher_co_busy to true and > > + * wake up @qmp_dispatcher_co after pushing the new requests. > > + * > > + * The coroutine will automatically change this variable back to false > > + * before it yields. Nobody else may set the variable to false. > > + * > > + * Access must be atomic for thread safety. > > + */ > > +bool qmp_dispatcher_co_busy; > > > > /* > > * Protects mon_list, monitor_qapi_event_state, coroutine_mon, > > @@ -623,9 +647,24 @@ void monitor_cleanup(void) > > } > > qemu_mutex_unlock(&monitor_lock); > > > > - /* QEMUBHs needs to be deleted before destroying the I/O thread */ > > - qemu_bh_delete(qmp_dispatcher_bh); > > - qmp_dispatcher_bh = NULL; > > + /* > > + * The dispatcher needs to stop before destroying the I/O thread. > > + * > > + * We need to poll both qemu_aio_context and iohandler_ctx to make > > + * sure that the dispatcher coroutine keeps making progress and > > + * eventually terminates. qemu_aio_context is automatically > > + * polled by calling AIO_WAIT_WHILE on it, but we must poll > > + * iohandler_ctx manually. > > + */ > > + qmp_dispatcher_co_shutdown = true; > > + if (!atomic_xchg(&qmp_dispatcher_co_busy, true)) { > > + aio_co_wake(qmp_dispatcher_co); > > + } > > + > > + AIO_WAIT_WHILE(qemu_get_aio_context(), > > + (aio_poll(iohandler_get_aio_context(), false), > > + atomic_mb_read(&qmp_dispatcher_co_busy))); > > + > > if (mon_iothread) { > > iothread_destroy(mon_iothread); > > mon_iothread = NULL; > > @@ -649,9 +688,9 @@ void monitor_init_globals_core(void) > > * have commands assuming that context. It would be nice to get > > * rid of those assumptions. > > */ > > - qmp_dispatcher_bh = aio_bh_new(iohandler_get_aio_context(), > > - monitor_qmp_bh_dispatcher, > > - NULL); > > + qmp_dispatcher_co = qemu_coroutine_create(monitor_qmp_dispatcher_co, NULL); > > + atomic_mb_set(&qmp_dispatcher_co_busy, true); > > + aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co); > > } > > > > int monitor_init(MonitorOptions *opts, bool allow_hmp, Error **errp) > > diff --git a/monitor/qmp.c b/monitor/qmp.c > > index 922fdb5541..69f6e93f38 100644 > > --- a/monitor/qmp.c > > +++ b/monitor/qmp.c > > @@ -133,6 +133,10 @@ static void monitor_qmp_respond(MonitorQMP *mon, QDict *rsp) > > } > > } > > > > +/* > > + * Runs outside of coroutine context for OOB commands, but in > > + * coroutine context for everything else. > > + */ > > static void monitor_qmp_dispatch(MonitorQMP *mon, QObject *req) > > { > > QDict *rsp; > > @@ -205,43 +209,99 @@ static QMPRequest *monitor_qmp_requests_pop_any_with_lock(void) > > return req_obj; > > } > > > > -void monitor_qmp_bh_dispatcher(void *data) > > +void coroutine_fn monitor_qmp_dispatcher_co(void *data) > > { > > - QMPRequest *req_obj = monitor_qmp_requests_pop_any_with_lock(); > > + QMPRequest *req_obj = NULL; > > QDict *rsp; > > bool need_resume; > > MonitorQMP *mon; > > > > - if (!req_obj) { > > - return; > > - } > > + while (true) { > > + assert(atomic_mb_read(&qmp_dispatcher_co_busy) == true); > > > > - mon = req_obj->mon; > > - /* qmp_oob_enabled() might change after "qmp_capabilities" */ > > - need_resume = !qmp_oob_enabled(mon) || > > - mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1; > > - qemu_mutex_unlock(&mon->qmp_queue_lock); > > - if (req_obj->req) { > > - QDict *qdict = qobject_to(QDict, req_obj->req); > > - QObject *id = qdict ? qdict_get(qdict, "id") : NULL; > > - trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: ""); > > - monitor_qmp_dispatch(mon, req_obj->req); > > - } else { > > - assert(req_obj->err); > > - rsp = qmp_error_response(req_obj->err); > > - req_obj->err = NULL; > > - monitor_qmp_respond(mon, rsp); > > - qobject_unref(rsp); > > - } > > + /* > > + * Mark the dispatcher as not busy already here so that we > > + * don't miss any new requests coming in the middle of our > > + * processing. > > + */ > > + atomic_mb_set(&qmp_dispatcher_co_busy, false); > > + > > + while (!(req_obj = monitor_qmp_requests_pop_any_with_lock())) { > > + /* > > + * No more requests to process. Wait to be reentered from > > + * handle_qmp_command() when it pushes more requests, or > > + * from monitor_cleanup() when it requests shutdown. > > + */ > > + if (!qmp_dispatcher_co_shutdown) { > > + qemu_coroutine_yield(); > > + > > + /* > > + * busy must be set to true again by whoever > > + * rescheduled us to avoid double scheduling > > + */ > > + assert(atomic_xchg(&qmp_dispatcher_co_busy, false) == true); > > + } > > + > > + /* > > + * qmp_dispatcher_co_shutdown may have changed if we > > + * yielded and were reentered from monitor_cleanup() > > + */ > > + if (qmp_dispatcher_co_shutdown) { > > + return; > > + } > > + } > > > > - if (need_resume) { > > - /* Pairs with the monitor_suspend() in handle_qmp_command() */ > > - monitor_resume(&mon->common); > > - } > > - qmp_request_free(req_obj); > > + if (atomic_xchg(&qmp_dispatcher_co_busy, true) == true) { > > + /* > > + * Someone rescheduled us (probably because a new requests > > + * came in), but we didn't actually yield. Do that now, > > + * only to be immediately reentered and removed from the > > + * list of scheduled coroutines. > > + */ > > + qemu_coroutine_yield(); > > + } > > > > - /* Reschedule instead of looping so the main loop stays responsive */ > > - qemu_bh_schedule(qmp_dispatcher_bh); > > + /* > > + * Move the coroutine from iohandler_ctx to qemu_aio_context for > > + * executing the command handler so that it can make progress if it > > + * involves an AIO_WAIT_WHILE(). > > + */ > > + aio_co_schedule(qemu_get_aio_context(), qmp_dispatcher_co); > > + qemu_coroutine_yield(); > > + > > + mon = req_obj->mon; > > + /* qmp_oob_enabled() might change after "qmp_capabilities" */ > > + need_resume = !qmp_oob_enabled(mon) || > > + mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1; > > + qemu_mutex_unlock(&mon->qmp_queue_lock); > > + if (req_obj->req) { > > + QDict *qdict = qobject_to(QDict, req_obj->req); > > + QObject *id = qdict ? qdict_get(qdict, "id") : NULL; > > + trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: ""); > > + monitor_qmp_dispatch(mon, req_obj->req); > > + } else { > > + assert(req_obj->err); > > + rsp = qmp_error_response(req_obj->err); > > + req_obj->err = NULL; > > + monitor_qmp_respond(mon, rsp); > > + qobject_unref(rsp); > > + } > > + > > + if (need_resume) { > > + /* Pairs with the monitor_suspend() in handle_qmp_command() */ > > + monitor_resume(&mon->common); > > + } > > + qmp_request_free(req_obj); > > + > > + /* > > + * Yield and reschedule so the main loop stays responsive. > > + * > > + * Move back to iohandler_ctx so that nested event loops for > > + * qemu_aio_context don't start new monitor commands. > > + */ > > + aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co); > > + qemu_coroutine_yield(); > > + } > > } > > > > static void handle_qmp_command(void *opaque, QObject *req, Error *err) > > @@ -302,7 +362,9 @@ static void handle_qmp_command(void *opaque, QObject *req, Error *err) > > qemu_mutex_unlock(&mon->qmp_queue_lock); > > > > /* Kick the dispatcher routine */ > > - qemu_bh_schedule(qmp_dispatcher_bh); > > + if (!atomic_xchg(&qmp_dispatcher_co_busy, true)) { > > + aio_co_wake(qmp_dispatcher_co); > > + } > > } > > > > static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size) > > diff --git a/qapi/qmp-dispatch.c b/qapi/qmp-dispatch.c > > index 5677ba92ca..754f7b854c 100644 > > --- a/qapi/qmp-dispatch.c > > +++ b/qapi/qmp-dispatch.c > > @@ -12,12 +12,16 @@ > > */ > > > > #include "qemu/osdep.h" > > + > > +#include "block/aio.h" > > #include "qapi/error.h" > > #include "qapi/qmp/dispatch.h" > > #include "qapi/qmp/qdict.h" > > #include "qapi/qmp/qjson.h" > > #include "sysemu/runstate.h" > > #include "qapi/qmp/qbool.h" > > +#include "qemu/coroutine.h" > > +#include "qemu/main-loop.h" > > > > static QDict *qmp_dispatch_check_obj(QDict *dict, bool allow_oob, > > Error **errp) > > @@ -88,6 +92,30 @@ bool qmp_is_oob(const QDict *dict) > > && !qdict_haskey(dict, "execute"); > > } > > > > +typedef struct QmpDispatchBH { > > + const QmpCommand *cmd; > > + Monitor *cur_mon; > > + QDict *args; > > + QObject **ret; > > + Error **errp; > > + Coroutine *co; > > +} QmpDispatchBH; > > + > > +static void do_qmp_dispatch_bh(void *opaque) > > +{ > > + QmpDispatchBH *data = opaque; > > + > > + assert(monitor_cur() == NULL); > > + monitor_set_cur(qemu_coroutine_self(), data->cur_mon); > > + data->cmd->fn(data->args, data->ret, data->errp); > > + monitor_set_cur(qemu_coroutine_self(), NULL); > > + aio_co_wake(data->co); > > +} > > + > > +/* > > + * Runs outside of coroutine context for OOB commands, but in coroutine > > + * context for everything else. > > + */ > > QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request, > > bool allow_oob, Monitor *cur_mon) > > { > > @@ -153,12 +181,35 @@ QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request, > > qobject_ref(args); > > } > > > > + assert(!(oob && qemu_in_coroutine())); > > assert(monitor_cur() == NULL); > > - monitor_set_cur(qemu_coroutine_self(), cur_mon); > > - > > - cmd->fn(args, &ret, &err); > > - > > - monitor_set_cur(qemu_coroutine_self(), NULL); > > + if (!!(cmd->options & QCO_COROUTINE) == qemu_in_coroutine()) { > > + monitor_set_cur(qemu_coroutine_self(), cur_mon); > > + cmd->fn(args, &ret, &err); > > + monitor_set_cur(qemu_coroutine_self(), NULL); > > + } else { > > + /* > > + * Not being in coroutine context implies that we're handling > > + * an OOB command, which must not have QCO_COROUTINE. > > + * > > + * This implies that we are in coroutine context, but the > > + * command doesn't have QCO_COROUTINE. We must drop out of > > + * coroutine context for this one. > > + */ > > I had to read this several times to get it. The first sentence leads me > into coroutine context, and then the next sentence tells me the > opposite, throwing me into confusion. > > Perhaps something like this: > > /* > * Actual context doesn't match the one the command needs. > * Case 1: we are in coroutine context, but command does not > * have QCO_COROUTINE. We need to drop out of coroutine > * context for executing it. > * Case 2: we are outside coroutine context, but command has > * QCO_COROUTINE. Can't actually happen, because we get here > * outside coroutine context only when executing a command > * out of band, and OOB commands never have QCO_COROUTINE. > */ Works for me. Can you squash this in while applying? Kevin
Am 15.09.2020 um 16:31 hat Stefan Hajnoczi geschrieben: > On Wed, Sep 09, 2020 at 05:11:48PM +0200, Kevin Wolf wrote: > > Add a function to move the current coroutine to the AioContext of a > > given BlockDriverState. > > > > Signed-off-by: Kevin Wolf <kwolf@redhat.com> > > --- > > include/block/block.h | 6 ++++++ > > block.c | 10 ++++++++++ > > 2 files changed, 16 insertions(+) > > > > diff --git a/include/block/block.h b/include/block/block.h > > index 981ab5b314..80ab322f11 100644 > > --- a/include/block/block.h > > +++ b/include/block/block.h > > @@ -626,6 +626,12 @@ bool bdrv_debug_is_suspended(BlockDriverState *bs, const char *tag); > > */ > > AioContext *bdrv_get_aio_context(BlockDriverState *bs); > > > > +/** > > + * Move the current coroutine to the AioContext of @bs and return the old > > + * AioContext of the coroutine. > > + */ > > +AioContext *coroutine_fn bdrv_co_move_to_aio_context(BlockDriverState *bs); > > I'm not sure this function handles all cases: > 1. Being called without the BQL (i.e. from an IOThread). > 2. Being called while a device stops using its IOThread. > > The races that come to mind are fetching the AioContext for bs and then > scheduling a BH. The BH is executed later on by the event loop. There > might be cases where the AioContext for bs is updated before the BH > runs. Updating the AioContext of a node drains the BDS first, so it would execute any BHs still pending in the old AioContext. So this part looks safe to me. I'm not sure what you mean with the BQL part. I don't think I'm accessing anything protected by the BQL? > I didn't investigate these cases but wanted to mention them in case you > want to add doc comments about when this function can be used or if > you'd like to verify them yourself. One thing that I'm not completely sure about (but that isn't really related to this specific patch) is AioContext changes later in the process when the actual command handler has yielded. We may have to be careful to prevent those for coroutine based QMP commands in the block layer. By sheer luck, qmp_block_resize() creates a new BlockBackend that has blk->allow_aio_context_change = false. So we're actually good in the only command I'm converting. Phew. Kevin
Am 10.09.2020 um 15:24 hat Stefan Hajnoczi geschrieben: > On Wed, Sep 09, 2020 at 05:11:36PM +0200, Kevin Wolf wrote: > > Some QMP command handlers can block the main loop for a relatively long > > time, for example because they perform some I/O. This is quite nasty. > > Allowing such handlers to run in a coroutine where they can yield (and > > therefore release the BQL) while waiting for an event such as I/O > > completion solves the problem. > > > > This series adds the infrastructure to allow this and switches > > block_resize to run in a coroutine as a first example. > > > > This is an alternative solution to Marc-André's "monitor: add > > asynchronous command type" series. > > Please clarify the following in the QAPI documentation: > * Is the QMP monitor suspended while the command is pending? Suspended as in monitor_suspend()? No. > * Are QMP events reported while the command is pending? Hm, I don't know to be honest. But I think so. Does it matter, though? I don't think events have a defined order compared to command results, and the client can't respond to the event anyway until the current command has completed. Kevin
Kevin Wolf <kwolf@redhat.com> writes: > Am 14.09.2020 um 17:30 hat Markus Armbruster geschrieben: >> Kevin Wolf <kwolf@redhat.com> writes: >> >> > This moves the QMP dispatcher to a coroutine and runs all QMP command >> > handlers that declare 'coroutine': true in coroutine context so they >> > can avoid blocking the main loop while doing I/O or waiting for other >> > events. >> > >> > For commands that are not declared safe to run in a coroutine, the >> > dispatcher drops out of coroutine context by calling the QMP command >> > handler from a bottom half. >> > >> > Signed-off-by: Kevin Wolf <kwolf@redhat.com> >> > Reviewed-by: Markus Armbruster <armbru@redhat.com> >> > --- [...] >> > diff --git a/qapi/qmp-dispatch.c b/qapi/qmp-dispatch.c >> > index 5677ba92ca..754f7b854c 100644 >> > --- a/qapi/qmp-dispatch.c >> > +++ b/qapi/qmp-dispatch.c [...] >> > @@ -153,12 +181,35 @@ QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request, >> > qobject_ref(args); >> > } >> > >> > + assert(!(oob && qemu_in_coroutine())); >> > assert(monitor_cur() == NULL); >> > - monitor_set_cur(qemu_coroutine_self(), cur_mon); >> > - >> > - cmd->fn(args, &ret, &err); >> > - >> > - monitor_set_cur(qemu_coroutine_self(), NULL); >> > + if (!!(cmd->options & QCO_COROUTINE) == qemu_in_coroutine()) { >> > + monitor_set_cur(qemu_coroutine_self(), cur_mon); >> > + cmd->fn(args, &ret, &err); >> > + monitor_set_cur(qemu_coroutine_self(), NULL); >> > + } else { >> > + /* >> > + * Not being in coroutine context implies that we're handling >> > + * an OOB command, which must not have QCO_COROUTINE. >> > + * >> > + * This implies that we are in coroutine context, but the >> > + * command doesn't have QCO_COROUTINE. We must drop out of >> > + * coroutine context for this one. >> > + */ >> >> I had to read this several times to get it. The first sentence leads me >> into coroutine context, and then the next sentence tells me the >> opposite, throwing me into confusion. >> >> Perhaps something like this: >> >> /* >> * Actual context doesn't match the one the command needs. >> * Case 1: we are in coroutine context, but command does not >> * have QCO_COROUTINE. We need to drop out of coroutine >> * context for executing it. >> * Case 2: we are outside coroutine context, but command has >> * QCO_COROUTINE. Can't actually happen, because we get here >> * outside coroutine context only when executing a command >> * out of band, and OOB commands never have QCO_COROUTINE. >> */ > > Works for me. Can you squash this in while applying? Sure!
On Fri, Sep 25, 2020 at 07:15:41PM +0200, Kevin Wolf wrote: > Am 10.09.2020 um 15:24 hat Stefan Hajnoczi geschrieben: > > On Wed, Sep 09, 2020 at 05:11:36PM +0200, Kevin Wolf wrote: > > > Some QMP command handlers can block the main loop for a relatively long > > > time, for example because they perform some I/O. This is quite nasty. > > > Allowing such handlers to run in a coroutine where they can yield (and > > > therefore release the BQL) while waiting for an event such as I/O > > > completion solves the problem. > > > > > > This series adds the infrastructure to allow this and switches > > > block_resize to run in a coroutine as a first example. > > > > > > This is an alternative solution to Marc-André's "monitor: add > > > asynchronous command type" series. > > > > Please clarify the following in the QAPI documentation: > > * Is the QMP monitor suspended while the command is pending? > > Suspended as in monitor_suspend()? No. > > > * Are QMP events reported while the command is pending? > > Hm, I don't know to be honest. But I think so. > > Does it matter, though? I don't think events have a defined order > compared to command results, and the client can't respond to the event > anyway until the current command has completed. You're right, I don't think it matters because the client must expect QMP events at any time. I was trying to understand the semantics of coroutine monitor commands from two perspectives: 1. The QMP client - do coroutine commands behave differently from non-coroutine commands? I think the answer is no. The monitor will not process more commands until the coroutine finishes? 2. The command implementation - which thread does the coroutine run in? I guess it runs in the main loop thread with the BQL and the AioContext acquired?
On Fri, Sep 25, 2020 at 06:00:51PM +0200, Kevin Wolf wrote: > Am 15.09.2020 um 16:31 hat Stefan Hajnoczi geschrieben: > > On Wed, Sep 09, 2020 at 05:11:48PM +0200, Kevin Wolf wrote: > > > Add a function to move the current coroutine to the AioContext of a > > > given BlockDriverState. > > > > > > Signed-off-by: Kevin Wolf <kwolf@redhat.com> > > > --- > > > include/block/block.h | 6 ++++++ > > > block.c | 10 ++++++++++ > > > 2 files changed, 16 insertions(+) > > > > > > diff --git a/include/block/block.h b/include/block/block.h > > > index 981ab5b314..80ab322f11 100644 > > > --- a/include/block/block.h > > > +++ b/include/block/block.h > > > @@ -626,6 +626,12 @@ bool bdrv_debug_is_suspended(BlockDriverState *bs, const char *tag); > > > */ > > > AioContext *bdrv_get_aio_context(BlockDriverState *bs); > > > > > > +/** > > > + * Move the current coroutine to the AioContext of @bs and return the old > > > + * AioContext of the coroutine. > > > + */ > > > +AioContext *coroutine_fn bdrv_co_move_to_aio_context(BlockDriverState *bs); > > > > I'm not sure this function handles all cases: > > 1. Being called without the BQL (i.e. from an IOThread). > > 2. Being called while a device stops using its IOThread. > > > > The races that come to mind are fetching the AioContext for bs and then > > scheduling a BH. The BH is executed later on by the event loop. There > > might be cases where the AioContext for bs is updated before the BH > > runs. The scenario I'm thinking about is where bs' AioContext changes while we are trying to move there. There is a window of time between fetching bs' AioContext, scheduling a BH in our old AioContext (not in bs' AioContext), and then scheduling the coroutine into the AioContext we previously fetched for bs. Is it possible for the AioContext value we stashed to be outdated by the time we use it? I think the answer is that it's safe to use this function from the main loop thread under the BQL. That way nothing else will change bs' AioContext while we're running. But it's probably not safe to use this function from an arbitrary IOThread (without the BQL). I think this limitation is okay but it needs to be documented. Maybe an assertion can verify that it holds. Stefan
On Fri, Sep 25, 2020 at 06:07:50PM +0200, Kevin Wolf wrote: > Am 15.09.2020 um 16:57 hat Stefan Hajnoczi geschrieben: > > On Wed, Sep 09, 2020 at 05:11:49PM +0200, Kevin Wolf wrote: > > > @@ -2456,8 +2456,7 @@ void qmp_block_resize(bool has_device, const char *device, > > > return; > > > } > > > > > > - aio_context = bdrv_get_aio_context(bs); > > > - aio_context_acquire(aio_context); > > > + old_ctx = bdrv_co_move_to_aio_context(bs); > > > > > > if (size < 0) { > > > error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "size", "a >0 size"); > > > > Is it safe to call blk_new() outside the BQL since it mutates global state? > > > > In other words, could another thread race with us? > > Hm, probably not. > > Would it be safer to have the bdrv_co_move_to_aio_context() call only > immediately before the drain? Yes, sounds good. > > > @@ -2479,8 +2478,8 @@ void qmp_block_resize(bool has_device, const char *device, > > > bdrv_drained_end(bs); > > > > > > out: > > > + aio_co_reschedule_self(old_ctx); > > > blk_unref(blk); > > > - aio_context_release(aio_context); > > > > The following precondition is violated by the blk_unref -> bdrv_drain -> > > AIO_WAIT_WHILE() call if blk->refcnt is 1 here: > > > > * The caller's thread must be the IOThread that owns @ctx or the main loop > > * thread (with @ctx acquired exactly once). > > > > blk_unref() is called from the main loop thread without having acquired > > blk's AioContext. > > > > Normally blk->refcnt will be > 1 so bdrv_drain() won't be called, but > > I'm not sure if that can be guaranteed. > > > > The following seems safer although it's uglier: > > > > aio_context = bdrv_get_aio_context(bs); > > aio_context_acquire(aio_context); > > blk_unref(blk); > > aio_context_release(aio_context); > > May we actually acquire aio_context if blk is in the main thread? I > think we must only do this if it's in a different iothread because we'd > end up with a recursive lock and drain would hang. Right :). Maybe an aio_context_acquire_once() API would help. Stefan
Kevin Wolf <kwolf@redhat.com> writes: > Am 10.09.2020 um 15:24 hat Stefan Hajnoczi geschrieben: >> On Wed, Sep 09, 2020 at 05:11:36PM +0200, Kevin Wolf wrote: >> > Some QMP command handlers can block the main loop for a relatively long >> > time, for example because they perform some I/O. This is quite nasty. >> > Allowing such handlers to run in a coroutine where they can yield (and >> > therefore release the BQL) while waiting for an event such as I/O >> > completion solves the problem. >> > >> > This series adds the infrastructure to allow this and switches >> > block_resize to run in a coroutine as a first example. >> > >> > This is an alternative solution to Marc-André's "monitor: add >> > asynchronous command type" series. >> >> Please clarify the following in the QAPI documentation: >> * Is the QMP monitor suspended while the command is pending? > > Suspended as in monitor_suspend()? No. A suspended monitor doesn't read monitor input. We suspend * a QMP monitor while the request queue is full * an HMP monitor while it executes a command * a multiplexed HMP monitor while the "mux-focus" is elsewhere * an HMP monitor when it executes command "quit", forever * an HMP monitor while it executes command "migrate" without -d Let me explain the first item in a bit more detail. Before OOB, a QMP monitor was also suspended while it executed a command. To make OOB work, we moved the QMP monitors to an I/O thread and added a request queue, drained by the main loop. QMP monitors continue to read commands, execute right away if OOB, else queue, suspend when queue gets full, resume when it gets non-full. The "run command in coroutine context" feature does not affect any of this. qapi-code-gen.txt does not talk about monitor suspension at all. It's instead discussed in qmp-spec.txt section 2.3.1 Out-of-band execution. Stefan, what would you like us to clarify, and where? >> * Are QMP events reported while the command is pending? > > Hm, I don't know to be honest. But I think so. Yes, events should be reported while a command is being executed. Sending events takes locks. Their critical sections are all short-lived. Another possible delay is the underlying character device failing the send with EAGAIN. That's all. Fine print: qapi_event_emit() takes @monitor_lock. It sends to each QMP monitor with qmp_send_response(), which uses monitor_puts(), which takes the monitor's @mon_lock. The "run command in coroutine context" feature does not affect any of this. > Does it matter, though? I don't think events have a defined order > compared to command results, and the client can't respond to the event > anyway until the current command has completed. Stefan, what would you like us to clarify, and where?
Am 28.09.2020 um 10:46 hat Stefan Hajnoczi geschrieben: > On Fri, Sep 25, 2020 at 07:15:41PM +0200, Kevin Wolf wrote: > > Am 10.09.2020 um 15:24 hat Stefan Hajnoczi geschrieben: > > > On Wed, Sep 09, 2020 at 05:11:36PM +0200, Kevin Wolf wrote: > > > > Some QMP command handlers can block the main loop for a relatively long > > > > time, for example because they perform some I/O. This is quite nasty. > > > > Allowing such handlers to run in a coroutine where they can yield (and > > > > therefore release the BQL) while waiting for an event such as I/O > > > > completion solves the problem. > > > > > > > > This series adds the infrastructure to allow this and switches > > > > block_resize to run in a coroutine as a first example. > > > > > > > > This is an alternative solution to Marc-André's "monitor: add > > > > asynchronous command type" series. > > > > > > Please clarify the following in the QAPI documentation: > > > * Is the QMP monitor suspended while the command is pending? > > > > Suspended as in monitor_suspend()? No. > > > > > * Are QMP events reported while the command is pending? > > > > Hm, I don't know to be honest. But I think so. > > > > Does it matter, though? I don't think events have a defined order > > compared to command results, and the client can't respond to the event > > anyway until the current command has completed. > > You're right, I don't think it matters because the client must expect > QMP events at any time. > > I was trying to understand the semantics of coroutine monitor commands > from two perspectives: > > 1. The QMP client - do coroutine commands behave differently from > non-coroutine commands? I think the answer is no. The monitor will > not process more commands until the coroutine finishes? No, on the wire, things should look exactly the same. If you consider more than the QMP traffic and the client communicates with the guest, it might see that the guest isn't blocked any more, of course. > 2. The command implementation - which thread does the coroutine run in? > I guess it runs in the main loop thread with the BQL and the > AioContext acquired? By default, yes. But the coroutine can reschedule itself to another thread. Block-related handlers will want to reschedule themselves to bs->aio_context because you can't just hold the AioContext lock from another thread across yields. This is what block_resize does after this series. Kevin
Am 28.09.2020 um 10:59 hat Stefan Hajnoczi geschrieben: > On Fri, Sep 25, 2020 at 06:00:51PM +0200, Kevin Wolf wrote: > > Am 15.09.2020 um 16:31 hat Stefan Hajnoczi geschrieben: > > > On Wed, Sep 09, 2020 at 05:11:48PM +0200, Kevin Wolf wrote: > > > > Add a function to move the current coroutine to the AioContext of a > > > > given BlockDriverState. > > > > > > > > Signed-off-by: Kevin Wolf <kwolf@redhat.com> > > > > --- > > > > include/block/block.h | 6 ++++++ > > > > block.c | 10 ++++++++++ > > > > 2 files changed, 16 insertions(+) > > > > > > > > diff --git a/include/block/block.h b/include/block/block.h > > > > index 981ab5b314..80ab322f11 100644 > > > > --- a/include/block/block.h > > > > +++ b/include/block/block.h > > > > @@ -626,6 +626,12 @@ bool bdrv_debug_is_suspended(BlockDriverState *bs, const char *tag); > > > > */ > > > > AioContext *bdrv_get_aio_context(BlockDriverState *bs); > > > > > > > > +/** > > > > + * Move the current coroutine to the AioContext of @bs and return the old > > > > + * AioContext of the coroutine. > > > > + */ > > > > +AioContext *coroutine_fn bdrv_co_move_to_aio_context(BlockDriverState *bs); > > > > > > I'm not sure this function handles all cases: > > > 1. Being called without the BQL (i.e. from an IOThread). > > > 2. Being called while a device stops using its IOThread. > > > > > > The races that come to mind are fetching the AioContext for bs and then > > > scheduling a BH. The BH is executed later on by the event loop. There > > > might be cases where the AioContext for bs is updated before the BH > > > runs. > > The scenario I'm thinking about is where bs' AioContext changes while we > are trying to move there. > > There is a window of time between fetching bs' AioContext, scheduling a > BH in our old AioContext (not in bs' AioContext), and then scheduling > the coroutine into the AioContext we previously fetched for bs. > > Is it possible for the AioContext value we stashed to be outdated by the > time we use it? > > I think the answer is that it's safe to use this function from the main > loop thread under the BQL. That way nothing else will change bs' > AioContext while we're running. But it's probably not safe to use this > function from an arbitrary IOThread (without the BQL). It's probably the safest to treat it as such. The first part of it (the window between fetching bs->aio_context and using it) is actually also true for this ubiquitous sequence: AioContext *ctx = bdrv_get_aio_context(bs); aio_context_acquire(ctx); I never really thought about this, but this is only safe in the main thread. Most of its users are of course monitor command handlers, which always run in the main thread. > I think this limitation is okay but it needs to be documented. Maybe an > assertion can verify that it holds. Yes, why not. Maybe we should actually change the interface into a pair of bdrv_co_enter/leave() functions that also increase bs->in_flight so that the whole section will complete before the AioContext of bs changes (changing the AioContext while the handle coroutine has yielded and will continue to run in the old context would be bad). block_resize is safe without it, but it might be better to introduce patterns that will be safe without being extra careful in each command. Kevin
Am 28.09.2020 um 11:05 hat Stefan Hajnoczi geschrieben: > On Fri, Sep 25, 2020 at 06:07:50PM +0200, Kevin Wolf wrote: > > Am 15.09.2020 um 16:57 hat Stefan Hajnoczi geschrieben: > > > On Wed, Sep 09, 2020 at 05:11:49PM +0200, Kevin Wolf wrote: > > > > @@ -2456,8 +2456,7 @@ void qmp_block_resize(bool has_device, const char *device, > > > > return; > > > > } > > > > > > > > - aio_context = bdrv_get_aio_context(bs); > > > > - aio_context_acquire(aio_context); > > > > + old_ctx = bdrv_co_move_to_aio_context(bs); > > > > > > > > if (size < 0) { > > > > error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "size", "a >0 size"); > > > > > > Is it safe to call blk_new() outside the BQL since it mutates global state? > > > > > > In other words, could another thread race with us? > > > > Hm, probably not. > > > > Would it be safer to have the bdrv_co_move_to_aio_context() call only > > immediately before the drain? > > Yes, sounds good. > > > > > @@ -2479,8 +2478,8 @@ void qmp_block_resize(bool has_device, const char *device, > > > > bdrv_drained_end(bs); > > > > > > > > out: > > > > + aio_co_reschedule_self(old_ctx); > > > > blk_unref(blk); > > > > - aio_context_release(aio_context); > > > > > > The following precondition is violated by the blk_unref -> bdrv_drain -> > > > AIO_WAIT_WHILE() call if blk->refcnt is 1 here: > > > > > > * The caller's thread must be the IOThread that owns @ctx or the main loop > > > * thread (with @ctx acquired exactly once). > > > > > > blk_unref() is called from the main loop thread without having acquired > > > blk's AioContext. > > > > > > Normally blk->refcnt will be > 1 so bdrv_drain() won't be called, but > > > I'm not sure if that can be guaranteed. > > > > > > The following seems safer although it's uglier: > > > > > > aio_context = bdrv_get_aio_context(bs); > > > aio_context_acquire(aio_context); > > > blk_unref(blk); > > > aio_context_release(aio_context); > > > > May we actually acquire aio_context if blk is in the main thread? I > > think we must only do this if it's in a different iothread because we'd > > end up with a recursive lock and drain would hang. > > Right :). Maybe an aio_context_acquire_once() API would help. If you want it to work in the general case, how would you implement this? As far as I know there is no way to tell whether we already own the lock or not. Something like aio_context_acquire_unless_self() might be easier to implement. Kevin
Additional nitpick detail on Kevin's request. Kevin Wolf <kwolf@redhat.com> writes: > cur_mon really needs to be coroutine-local as soon as we move monitor > command handlers to coroutines and let them yield. As a first step, just > remove all direct accesses to cur_mon so that we can implement this in > the getter function later. > > Signed-off-by: Kevin Wolf <kwolf@redhat.com> [...] > diff --git a/tests/test-util-sockets.c b/tests/test-util-sockets.c > index af9f5c0c70..6c50dbf051 100644 > --- a/tests/test-util-sockets.c > +++ b/tests/test-util-sockets.c > @@ -52,6 +52,7 @@ static void test_fd_is_socket_good(void) > > static int mon_fd = -1; > static const char *mon_fdname; > +__thread Monitor *cur_mon; > > int monitor_get_fd(Monitor *mon, const char *fdname, Error **errp) > { > @@ -66,15 +67,12 @@ int monitor_get_fd(Monitor *mon, const char *fdname, Error **errp) > > /* > * Syms of stubs in libqemuutil.a are discarded at .o file granularity. > - * To replace monitor_get_fd() we must ensure everything in > - * stubs/monitor.c is defined, to make sure monitor.o is discarded > - * otherwise we get duplicate syms at link time. > + * To replace monitor_get_fd() and monitor_cur(), we must ensure that we also > + * replace any other symbol that is used in the binary and would be taken from > + * the same stub object file, otherwise we get duplicate syms at link time. Wrapping the comment around column 70 or so would make it easier to read. File has no maintainers. Up to you. > */ > -__thread Monitor *cur_mon; > +Monitor *monitor_cur(void) { return cur_mon; } > int monitor_vprintf(Monitor *mon, const char *fmt, va_list ap) { abort(); } > -void monitor_init_qmp(Chardev *chr, bool pretty, Error **errp) {} > -void monitor_init_hmp(Chardev *chr, bool use_readline, Error **errp) {} > - > > static void test_socket_fd_pass_name_good(void) > { [...]
Additional nitpick detail on Kevin's request. Kevin Wolf <kwolf@redhat.com> writes: > This moves the QMP dispatcher to a coroutine and runs all QMP command > handlers that declare 'coroutine': true in coroutine context so they > can avoid blocking the main loop while doing I/O or waiting for other > events. > > For commands that are not declared safe to run in a coroutine, the > dispatcher drops out of coroutine context by calling the QMP command > handler from a bottom half. > > Signed-off-by: Kevin Wolf <kwolf@redhat.com> > Reviewed-by: Markus Armbruster <armbru@redhat.com> [...] > diff --git a/monitor/monitor.c b/monitor/monitor.c > index 629aa073ee..ac2722bf91 100644 > --- a/monitor/monitor.c > +++ b/monitor/monitor.c > @@ -55,8 +55,32 @@ typedef struct { > /* Shared monitor I/O thread */ > IOThread *mon_iothread; > > -/* Bottom half to dispatch the requests received from I/O thread */ > -QEMUBH *qmp_dispatcher_bh; > +/* Coroutine to dispatch the requests received from I/O thread */ > +Coroutine *qmp_dispatcher_co; > + > +/* Set to true when the dispatcher coroutine should terminate */ > +bool qmp_dispatcher_co_shutdown; > + > +/* > + * qmp_dispatcher_co_busy is used for synchronisation between the > + * monitor thread and the main thread to ensure that the dispatcher > + * coroutine never gets scheduled a second time when it's already > + * scheduled (scheduling the same coroutine twice is forbidden). > + * > + * It is true if the coroutine is active and processing requests. > + * Additional requests may then be pushed onto mon->qmp_requests, > + * and @qmp_dispatcher_co_shutdown may be set without further ado. > + * @qmp_dispatcher_co_busy must not be woken up in this case. > + * > + * If false, you also have to set @qmp_dispatcher_co_busy to true and > + * wake up @qmp_dispatcher_co after pushing the new requests. > + * > + * The coroutine will automatically change this variable back to false > + * before it yields. Nobody else may set the variable to false. > + * > + * Access must be atomic for thread safety. > + */ > +bool qmp_dispatcher_co_busy; > > /* > * Protects mon_list, monitor_qapi_event_state, coroutine_mon, > @@ -623,9 +647,24 @@ void monitor_cleanup(void) > } > qemu_mutex_unlock(&monitor_lock); > > - /* QEMUBHs needs to be deleted before destroying the I/O thread */ > - qemu_bh_delete(qmp_dispatcher_bh); > - qmp_dispatcher_bh = NULL; > + /* > + * The dispatcher needs to stop before destroying the I/O thread. > + * > + * We need to poll both qemu_aio_context and iohandler_ctx to make > + * sure that the dispatcher coroutine keeps making progress and > + * eventually terminates. qemu_aio_context is automatically > + * polled by calling AIO_WAIT_WHILE on it, but we must poll > + * iohandler_ctx manually. > + */ > + qmp_dispatcher_co_shutdown = true; > + if (!atomic_xchg(&qmp_dispatcher_co_busy, true)) { > + aio_co_wake(qmp_dispatcher_co); > + } > + > + AIO_WAIT_WHILE(qemu_get_aio_context(), > + (aio_poll(iohandler_get_aio_context(), false), > + atomic_mb_read(&qmp_dispatcher_co_busy))); > + > if (mon_iothread) { > iothread_destroy(mon_iothread); > mon_iothread = NULL; > @@ -649,9 +688,9 @@ void monitor_init_globals_core(void) > * have commands assuming that context. It would be nice to get > * rid of those assumptions. > */ > - qmp_dispatcher_bh = aio_bh_new(iohandler_get_aio_context(), > - monitor_qmp_bh_dispatcher, > - NULL); > + qmp_dispatcher_co = qemu_coroutine_create(monitor_qmp_dispatcher_co, NULL); Rather long line, caused by rather long identifiers. Not your fault; you imitated the existing pattern static variable qmp_dispatcher_bh / extern function monitor_qmp_bh_dispatcher(). But the prefix monitor_qmp_ is awfully long, and not just here. Let's leave this for another day. > + atomic_mb_set(&qmp_dispatcher_co_busy, true); > + aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co); > } > > int monitor_init(MonitorOptions *opts, bool allow_hmp, Error **errp) [...] > diff --git a/util/aio-posix.c b/util/aio-posix.c > index f7f13ebfc2..30bb21d699 100644 > --- a/util/aio-posix.c > +++ b/util/aio-posix.c > @@ -15,6 +15,7 @@ > > #include "qemu/osdep.h" > #include "block/block.h" > +#include "qemu/main-loop.h" > #include "qemu/rcu.h" > #include "qemu/rcu_queue.h" > #include "qemu/sockets.h" > @@ -558,8 +559,13 @@ bool aio_poll(AioContext *ctx, bool blocking) > * There cannot be two concurrent aio_poll calls for the same AioContext (or > * an aio_poll concurrent with a GSource prepare/check/dispatch callback). > * We rely on this below to avoid slow locked accesses to ctx->notify_me. > + * > + * aio_poll() may only be called in the AioContext's thread. iohandler_ctx > + * is special in that it runs in the main thread, but that thread's context > + * is qemu_aio_context. Wrapping the comment around column 70 or so would make it easier to read. Up to you. > */ > - assert(in_aio_context_home_thread(ctx)); > + assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ? > + qemu_get_aio_context() : ctx)); > > qemu_lockcnt_inc(&ctx->list_lock);
Additional nitpick detail on Kevin's request. Kevin Wolf <kwolf@redhat.com> writes: > Add a function that can be used to move the currently running coroutine > to a different AioContext (and therefore potentially a different > thread). > > Signed-off-by: Kevin Wolf <kwolf@redhat.com> > --- > include/block/aio.h | 10 ++++++++++ > util/async.c | 30 ++++++++++++++++++++++++++++++ > 2 files changed, 40 insertions(+) > > diff --git a/include/block/aio.h b/include/block/aio.h > index b2f703fa3f..c37617b404 100644 > --- a/include/block/aio.h > +++ b/include/block/aio.h > @@ -17,6 +17,7 @@ > #ifdef CONFIG_LINUX_IO_URING > #include <liburing.h> > #endif > +#include "qemu/coroutine.h" > #include "qemu/queue.h" > #include "qemu/event_notifier.h" > #include "qemu/thread.h" > @@ -654,6 +655,15 @@ static inline bool aio_node_check(AioContext *ctx, bool is_external) > */ > void aio_co_schedule(AioContext *ctx, struct Coroutine *co); > > +/** > + * aio_co_reschedule_self: > + * @new_ctx: the new context > + * > + * Move the currently running coroutine to new_ctx. If the coroutine is already > + * running in new_ctx, do nothing. Wrapping the comment around column 70 or so would make it easier to read. Up to you. > + */ > +void coroutine_fn aio_co_reschedule_self(AioContext *new_ctx); > + > /** > * aio_co_wake: > * @co: the coroutine > diff --git a/util/async.c b/util/async.c > index 4266745dee..a609e18693 100644 > --- a/util/async.c > +++ b/util/async.c > @@ -569,6 +569,36 @@ void aio_co_schedule(AioContext *ctx, Coroutine *co) > aio_context_unref(ctx); > } > > +typedef struct AioCoRescheduleSelf { > + Coroutine *co; > + AioContext *new_ctx; > +} AioCoRescheduleSelf; > + > +static void aio_co_reschedule_self_bh(void *opaque) > +{ > + AioCoRescheduleSelf *data = opaque; > + aio_co_schedule(data->new_ctx, data->co); > +} > + > +void coroutine_fn aio_co_reschedule_self(AioContext *new_ctx) > +{ > + AioContext *old_ctx = qemu_get_current_aio_context(); > + > + if (old_ctx != new_ctx) { > + AioCoRescheduleSelf data = { > + .co = qemu_coroutine_self(), > + .new_ctx = new_ctx, > + }; > + /* > + * We can't directly schedule the coroutine in the target context > + * because this would be racy: The other thread could try to enter the > + * coroutine before it has yielded in this one. > + */ Likewise. > + aio_bh_schedule_oneshot(old_ctx, aio_co_reschedule_self_bh, &data); > + qemu_coroutine_yield(); > + } > +} > + > void aio_co_wake(struct Coroutine *co) > { > AioContext *ctx;