mbox series

[v7,00/13] monitor: Optionally run handlers in coroutines

Message ID 20200909151149.490589-1-kwolf@redhat.com
Headers show
Series monitor: Optionally run handlers in coroutines | expand

Message

Kevin Wolf Sept. 9, 2020, 3:11 p.m. UTC
Some QMP command handlers can block the main loop for a relatively long
time, for example because they perform some I/O. This is quite nasty.
Allowing such handlers to run in a coroutine where they can yield (and
therefore release the BQL) while waiting for an event such as I/O
completion solves the problem.

This series adds the infrastructure to allow this and switches
block_resize to run in a coroutine as a first example.

This is an alternative solution to Marc-André's "monitor: add
asynchronous command type" series.

v7:
- Added patch 2 to add a Monitor parameter to monitor_get_cpu_index(),
  too [Markus]
- Let monitor_set_cur() return the old monitor [Markus]
- Fixed comment about linking stub objects in test-util-sockets [Markus]
- More detailed commit message for per-coroutine current monitor and
  document that monitor_set_cur(NULL) must be called eventually [Markus]
- Resolve some merge conflicts on rebase

v6:
- Fixed cur_mon behaviour: It should always point to the Monitor while
  we're in the handler coroutine, but be NULL while the handler
  coroutines has yielded. [Markus]
- Give HMP handlers the coroutine option, too, because they will call
  QMP handlers, and life is easier when we know whether we are in
  coroutine context or not.
- Fixed block_resize for block devices in iothreads and for HMP
- Resolved some merge conflict with QAPI generator and monitor
  refactorings that were merged in the meantime

v5:
- Improved comments and documentation [Markus]

v4:
- Forbid 'oob': true, 'coroutine': true [Markus]
- Removed Python type hints [Markus]
- Introduced separate bool qmp_dispatcher_co_shutdown to make it clearer
  how a shutdown request is signalled to the dispatcher [Markus]
- Allow using aio_poll() with iohandler_ctx and use that instead of
  aio_bh_poll() [Markus]
- Removed coroutine_fn from qmp_block_resize() again because at least
  one caller (HMP) calls it outside of coroutine context [Markus]
- Tried to make the synchronisation between dispatcher and the monitor
  thread clearer, and fixed a race condition
- Improved documentation and comments

v3:
- Fix race between monitor thread and dispatcher that could schedule the
  dispatcher coroutine twice if a second requests comes in before the
  dispatcher can wake up [Patchew]

v2:
- Fix typo in a commit message [Eric]
- Use hyphen instead of underscore for the test command [Eric]
- Mark qmp_block_resize() as coroutine_fn [Stefan]


Kevin Wolf (13):
  monitor: Add Monitor parameter to monitor_set_cpu()
  monitor: Add Monitor parameter to monitor_get_cpu_index()
  monitor: Use getter/setter functions for cur_mon
  hmp: Update current monitor only in handle_hmp_command()
  qmp: Assert that no other monitor is active
  qmp: Call monitor_set_cur() only in qmp_dispatch()
  monitor: Make current monitor a per-coroutine property
  qapi: Add a 'coroutine' flag for commands
  qmp: Move dispatcher to a coroutine
  hmp: Add support for coroutine command handlers
  util/async: Add aio_co_reschedule_self()
  block: Add bdrv_co_move_to_aio_context()
  block: Convert 'block_resize' to coroutine

 qapi/block-core.json                    |   3 +-
 docs/devel/qapi-code-gen.txt            |  12 +++
 include/block/aio.h                     |  10 ++
 include/block/block.h                   |   6 ++
 include/monitor/monitor.h               |   7 +-
 include/qapi/qmp/dispatch.h             |   5 +-
 monitor/monitor-internal.h              |   7 +-
 audio/wavcapture.c                      |   8 +-
 block.c                                 |  10 ++
 blockdev.c                              |  13 ++-
 dump/dump.c                             |   2 +-
 hw/core/machine-hmp-cmds.c              |   2 +-
 hw/scsi/vhost-scsi.c                    |   2 +-
 hw/virtio/vhost-vsock.c                 |   2 +-
 migration/fd.c                          |   4 +-
 monitor/hmp-cmds.c                      |   4 +-
 monitor/hmp.c                           |  44 ++++++--
 monitor/misc.c                          |  38 ++++---
 monitor/monitor.c                       | 101 ++++++++++++++++--
 monitor/qmp-cmds-control.c              |   2 +
 monitor/qmp-cmds.c                      |   2 +-
 monitor/qmp.c                           | 130 +++++++++++++++++-------
 net/socket.c                            |   2 +-
 net/tap.c                               |   6 +-
 qapi/qmp-dispatch.c                     |  61 ++++++++++-
 qapi/qmp-registry.c                     |   3 +
 qga/main.c                              |   2 +-
 softmmu/cpus.c                          |   2 +-
 stubs/monitor-core.c                    |  10 +-
 tests/test-qmp-cmds.c                   |  10 +-
 tests/test-util-sockets.c               |  12 +--
 trace/control.c                         |   2 +-
 util/aio-posix.c                        |   8 +-
 util/async.c                            |  30 ++++++
 util/qemu-error.c                       |   6 +-
 util/qemu-print.c                       |   3 +-
 util/qemu-sockets.c                     |   1 +
 scripts/qapi/commands.py                |  10 +-
 scripts/qapi/doc.py                     |   2 +-
 scripts/qapi/expr.py                    |  10 +-
 scripts/qapi/introspect.py              |   2 +-
 scripts/qapi/schema.py                  |  12 ++-
 tests/qapi-schema/test-qapi.py          |   7 +-
 hmp-commands.hx                         |   1 +
 tests/qapi-schema/meson.build           |   1 +
 tests/qapi-schema/oob-coroutine.err     |   2 +
 tests/qapi-schema/oob-coroutine.json    |   2 +
 tests/qapi-schema/oob-coroutine.out     |   0
 tests/qapi-schema/qapi-schema-test.json |   1 +
 tests/qapi-schema/qapi-schema-test.out  |   2 +
 50 files changed, 481 insertions(+), 143 deletions(-)
 create mode 100644 tests/qapi-schema/oob-coroutine.err
 create mode 100644 tests/qapi-schema/oob-coroutine.json
 create mode 100644 tests/qapi-schema/oob-coroutine.out

Comments

no-reply@patchew.org Sept. 9, 2020, 3:24 p.m. UTC | #1
Patchew URL: https://patchew.org/QEMU/20200909151149.490589-1-kwolf@redhat.com/



Hi,

This series failed build test on FreeBSD host. Please find the details below.

=== TEST SCRIPT BEGIN ===
#!/bin/bash
# Testing script will be invoked under the git checkout with
# HEAD pointing to a commit that has the patches applied on top of "base"
# branch
if qemu-system-x86_64 --help >/dev/null 2>&1; then
  QEMU=qemu-system-x86_64
elif /usr/libexec/qemu-kvm --help >/dev/null 2>&1; then
  QEMU=/usr/libexec/qemu-kvm
else
  exit 1
fi
make vm-build-freebsd J=21 QEMU=$QEMU
exit 0
=== TEST SCRIPT END ===




The full log is available at
http://patchew.org/logs/20200909151149.490589-1-kwolf@redhat.com/testing.FreeBSD/?type=message.
---
Email generated automatically by Patchew [https://patchew.org/].
Please send your feedback to patchew-devel@redhat.com
Markus Armbruster Sept. 14, 2020, 3:09 p.m. UTC | #2
Stefan Hajnoczi <stefanha@redhat.com> writes:

> On Wed, Sep 09, 2020 at 05:11:36PM +0200, Kevin Wolf wrote:

>> Some QMP command handlers can block the main loop for a relatively long

>> time, for example because they perform some I/O. This is quite nasty.

>> Allowing such handlers to run in a coroutine where they can yield (and

>> therefore release the BQL) while waiting for an event such as I/O

>> completion solves the problem.

>> 

>> This series adds the infrastructure to allow this and switches

>> block_resize to run in a coroutine as a first example.

>> 

>> This is an alternative solution to Marc-André's "monitor: add

>> asynchronous command type" series.

>

> Please clarify the following in the QAPI documentation:

>  * Is the QMP monitor suspended while the command is pending?

>  * Are QMP events reported while the command is pending?


Good points.  Kevin, I'd be willing to take this as a follow-up patch,
if that's more convenient for you.

> Acked-by: Stefan Hajnoczi <stefanha@redhat.com>


Stefan, I could use your proper review of PATCH 11-13.  Pretty-please?
Markus Armbruster Sept. 14, 2020, 3:30 p.m. UTC | #3
Kevin Wolf <kwolf@redhat.com> writes:

> This moves the QMP dispatcher to a coroutine and runs all QMP command

> handlers that declare 'coroutine': true in coroutine context so they

> can avoid blocking the main loop while doing I/O or waiting for other

> events.

>

> For commands that are not declared safe to run in a coroutine, the

> dispatcher drops out of coroutine context by calling the QMP command

> handler from a bottom half.

>

> Signed-off-by: Kevin Wolf <kwolf@redhat.com>

> Reviewed-by: Markus Armbruster <armbru@redhat.com>

> ---

>  include/qapi/qmp/dispatch.h |   1 +

>  monitor/monitor-internal.h  |   6 +-

>  monitor/monitor.c           |  55 +++++++++++++---

>  monitor/qmp.c               | 122 +++++++++++++++++++++++++++---------

>  qapi/qmp-dispatch.c         |  61 ++++++++++++++++--

>  qapi/qmp-registry.c         |   3 +

>  util/aio-posix.c            |   8 ++-

>  7 files changed, 210 insertions(+), 46 deletions(-)

>

> diff --git a/include/qapi/qmp/dispatch.h b/include/qapi/qmp/dispatch.h

> index 9fd2b720a7..af8d96c570 100644

> --- a/include/qapi/qmp/dispatch.h

> +++ b/include/qapi/qmp/dispatch.h

> @@ -31,6 +31,7 @@ typedef enum QmpCommandOptions

>  typedef struct QmpCommand

>  {

>      const char *name;

> +    /* Runs in coroutine context if QCO_COROUTINE is set */

>      QmpCommandFunc *fn;

>      QmpCommandOptions options;

>      QTAILQ_ENTRY(QmpCommand) node;

> diff --git a/monitor/monitor-internal.h b/monitor/monitor-internal.h

> index b39e03b744..b55d6df07f 100644

> --- a/monitor/monitor-internal.h

> +++ b/monitor/monitor-internal.h

> @@ -155,7 +155,9 @@ static inline bool monitor_is_qmp(const Monitor *mon)

>  

>  typedef QTAILQ_HEAD(MonitorList, Monitor) MonitorList;

>  extern IOThread *mon_iothread;

> -extern QEMUBH *qmp_dispatcher_bh;

> +extern Coroutine *qmp_dispatcher_co;

> +extern bool qmp_dispatcher_co_shutdown;

> +extern bool qmp_dispatcher_co_busy;

>  extern QmpCommandList qmp_commands, qmp_cap_negotiation_commands;

>  extern QemuMutex monitor_lock;

>  extern MonitorList mon_list;

> @@ -173,7 +175,7 @@ void monitor_fdsets_cleanup(void);

>  

>  void qmp_send_response(MonitorQMP *mon, const QDict *rsp);

>  void monitor_data_destroy_qmp(MonitorQMP *mon);

> -void monitor_qmp_bh_dispatcher(void *data);

> +void coroutine_fn monitor_qmp_dispatcher_co(void *data);

>  

>  int get_monitor_def(int64_t *pval, const char *name);

>  void help_cmd(Monitor *mon, const char *name);

> diff --git a/monitor/monitor.c b/monitor/monitor.c

> index 629aa073ee..ac2722bf91 100644

> --- a/monitor/monitor.c

> +++ b/monitor/monitor.c

> @@ -55,8 +55,32 @@ typedef struct {

>  /* Shared monitor I/O thread */

>  IOThread *mon_iothread;

>  

> -/* Bottom half to dispatch the requests received from I/O thread */

> -QEMUBH *qmp_dispatcher_bh;

> +/* Coroutine to dispatch the requests received from I/O thread */

> +Coroutine *qmp_dispatcher_co;

> +

> +/* Set to true when the dispatcher coroutine should terminate */

> +bool qmp_dispatcher_co_shutdown;

> +

> +/*

> + * qmp_dispatcher_co_busy is used for synchronisation between the

> + * monitor thread and the main thread to ensure that the dispatcher

> + * coroutine never gets scheduled a second time when it's already

> + * scheduled (scheduling the same coroutine twice is forbidden).

> + *

> + * It is true if the coroutine is active and processing requests.

> + * Additional requests may then be pushed onto mon->qmp_requests,

> + * and @qmp_dispatcher_co_shutdown may be set without further ado.

> + * @qmp_dispatcher_co_busy must not be woken up in this case.

> + *

> + * If false, you also have to set @qmp_dispatcher_co_busy to true and

> + * wake up @qmp_dispatcher_co after pushing the new requests.

> + *

> + * The coroutine will automatically change this variable back to false

> + * before it yields.  Nobody else may set the variable to false.

> + *

> + * Access must be atomic for thread safety.

> + */

> +bool qmp_dispatcher_co_busy;

>  

>  /*

>   * Protects mon_list, monitor_qapi_event_state, coroutine_mon,

> @@ -623,9 +647,24 @@ void monitor_cleanup(void)

>      }

>      qemu_mutex_unlock(&monitor_lock);

>  

> -    /* QEMUBHs needs to be deleted before destroying the I/O thread */

> -    qemu_bh_delete(qmp_dispatcher_bh);

> -    qmp_dispatcher_bh = NULL;

> +    /*

> +     * The dispatcher needs to stop before destroying the I/O thread.

> +     *

> +     * We need to poll both qemu_aio_context and iohandler_ctx to make

> +     * sure that the dispatcher coroutine keeps making progress and

> +     * eventually terminates.  qemu_aio_context is automatically

> +     * polled by calling AIO_WAIT_WHILE on it, but we must poll

> +     * iohandler_ctx manually.

> +     */

> +    qmp_dispatcher_co_shutdown = true;

> +    if (!atomic_xchg(&qmp_dispatcher_co_busy, true)) {

> +        aio_co_wake(qmp_dispatcher_co);

> +    }

> +

> +    AIO_WAIT_WHILE(qemu_get_aio_context(),

> +                   (aio_poll(iohandler_get_aio_context(), false),

> +                    atomic_mb_read(&qmp_dispatcher_co_busy)));

> +

>      if (mon_iothread) {

>          iothread_destroy(mon_iothread);

>          mon_iothread = NULL;

> @@ -649,9 +688,9 @@ void monitor_init_globals_core(void)

>       * have commands assuming that context.  It would be nice to get

>       * rid of those assumptions.

>       */

> -    qmp_dispatcher_bh = aio_bh_new(iohandler_get_aio_context(),

> -                                   monitor_qmp_bh_dispatcher,

> -                                   NULL);

> +    qmp_dispatcher_co = qemu_coroutine_create(monitor_qmp_dispatcher_co, NULL);

> +    atomic_mb_set(&qmp_dispatcher_co_busy, true);

> +    aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);

>  }

>  

>  int monitor_init(MonitorOptions *opts, bool allow_hmp, Error **errp)

> diff --git a/monitor/qmp.c b/monitor/qmp.c

> index 922fdb5541..69f6e93f38 100644

> --- a/monitor/qmp.c

> +++ b/monitor/qmp.c

> @@ -133,6 +133,10 @@ static void monitor_qmp_respond(MonitorQMP *mon, QDict *rsp)

>      }

>  }

>  

> +/*

> + * Runs outside of coroutine context for OOB commands, but in

> + * coroutine context for everything else.

> + */

>  static void monitor_qmp_dispatch(MonitorQMP *mon, QObject *req)

>  {

>      QDict *rsp;

> @@ -205,43 +209,99 @@ static QMPRequest *monitor_qmp_requests_pop_any_with_lock(void)

>      return req_obj;

>  }

>  

> -void monitor_qmp_bh_dispatcher(void *data)

> +void coroutine_fn monitor_qmp_dispatcher_co(void *data)

>  {

> -    QMPRequest *req_obj = monitor_qmp_requests_pop_any_with_lock();

> +    QMPRequest *req_obj = NULL;

>      QDict *rsp;

>      bool need_resume;

>      MonitorQMP *mon;

>  

> -    if (!req_obj) {

> -        return;

> -    }

> +    while (true) {

> +        assert(atomic_mb_read(&qmp_dispatcher_co_busy) == true);

>  

> -    mon = req_obj->mon;

> -    /*  qmp_oob_enabled() might change after "qmp_capabilities" */

> -    need_resume = !qmp_oob_enabled(mon) ||

> -        mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;

> -    qemu_mutex_unlock(&mon->qmp_queue_lock);

> -    if (req_obj->req) {

> -        QDict *qdict = qobject_to(QDict, req_obj->req);

> -        QObject *id = qdict ? qdict_get(qdict, "id") : NULL;

> -        trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");

> -        monitor_qmp_dispatch(mon, req_obj->req);

> -    } else {

> -        assert(req_obj->err);

> -        rsp = qmp_error_response(req_obj->err);

> -        req_obj->err = NULL;

> -        monitor_qmp_respond(mon, rsp);

> -        qobject_unref(rsp);

> -    }

> +        /*

> +         * Mark the dispatcher as not busy already here so that we

> +         * don't miss any new requests coming in the middle of our

> +         * processing.

> +         */

> +        atomic_mb_set(&qmp_dispatcher_co_busy, false);

> +

> +        while (!(req_obj = monitor_qmp_requests_pop_any_with_lock())) {

> +            /*

> +             * No more requests to process.  Wait to be reentered from

> +             * handle_qmp_command() when it pushes more requests, or

> +             * from monitor_cleanup() when it requests shutdown.

> +             */

> +            if (!qmp_dispatcher_co_shutdown) {

> +                qemu_coroutine_yield();

> +

> +                /*

> +                 * busy must be set to true again by whoever

> +                 * rescheduled us to avoid double scheduling

> +                 */

> +                assert(atomic_xchg(&qmp_dispatcher_co_busy, false) == true);

> +            }

> +

> +            /*

> +             * qmp_dispatcher_co_shutdown may have changed if we

> +             * yielded and were reentered from monitor_cleanup()

> +             */

> +            if (qmp_dispatcher_co_shutdown) {

> +                return;

> +            }

> +        }

>  

> -    if (need_resume) {

> -        /* Pairs with the monitor_suspend() in handle_qmp_command() */

> -        monitor_resume(&mon->common);

> -    }

> -    qmp_request_free(req_obj);

> +        if (atomic_xchg(&qmp_dispatcher_co_busy, true) == true) {

> +            /*

> +             * Someone rescheduled us (probably because a new requests

> +             * came in), but we didn't actually yield. Do that now,

> +             * only to be immediately reentered and removed from the

> +             * list of scheduled coroutines.

> +             */

> +            qemu_coroutine_yield();

> +        }

>  

> -    /* Reschedule instead of looping so the main loop stays responsive */

> -    qemu_bh_schedule(qmp_dispatcher_bh);

> +        /*

> +         * Move the coroutine from iohandler_ctx to qemu_aio_context for

> +         * executing the command handler so that it can make progress if it

> +         * involves an AIO_WAIT_WHILE().

> +         */

> +        aio_co_schedule(qemu_get_aio_context(), qmp_dispatcher_co);

> +        qemu_coroutine_yield();

> +

> +        mon = req_obj->mon;

> +        /* qmp_oob_enabled() might change after "qmp_capabilities" */

> +        need_resume = !qmp_oob_enabled(mon) ||

> +            mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;

> +        qemu_mutex_unlock(&mon->qmp_queue_lock);

> +        if (req_obj->req) {

> +            QDict *qdict = qobject_to(QDict, req_obj->req);

> +            QObject *id = qdict ? qdict_get(qdict, "id") : NULL;

> +            trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");

> +            monitor_qmp_dispatch(mon, req_obj->req);

> +        } else {

> +            assert(req_obj->err);

> +            rsp = qmp_error_response(req_obj->err);

> +            req_obj->err = NULL;

> +            monitor_qmp_respond(mon, rsp);

> +            qobject_unref(rsp);

> +        }

> +

> +        if (need_resume) {

> +            /* Pairs with the monitor_suspend() in handle_qmp_command() */

> +            monitor_resume(&mon->common);

> +        }

> +        qmp_request_free(req_obj);

> +

> +        /*

> +         * Yield and reschedule so the main loop stays responsive.

> +         *

> +         * Move back to iohandler_ctx so that nested event loops for

> +         * qemu_aio_context don't start new monitor commands.

> +         */

> +        aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);

> +        qemu_coroutine_yield();

> +    }

>  }

>  

>  static void handle_qmp_command(void *opaque, QObject *req, Error *err)

> @@ -302,7 +362,9 @@ static void handle_qmp_command(void *opaque, QObject *req, Error *err)

>      qemu_mutex_unlock(&mon->qmp_queue_lock);

>  

>      /* Kick the dispatcher routine */

> -    qemu_bh_schedule(qmp_dispatcher_bh);

> +    if (!atomic_xchg(&qmp_dispatcher_co_busy, true)) {

> +        aio_co_wake(qmp_dispatcher_co);

> +    }

>  }

>  

>  static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size)

> diff --git a/qapi/qmp-dispatch.c b/qapi/qmp-dispatch.c

> index 5677ba92ca..754f7b854c 100644

> --- a/qapi/qmp-dispatch.c

> +++ b/qapi/qmp-dispatch.c

> @@ -12,12 +12,16 @@

>   */

>  

>  #include "qemu/osdep.h"

> +

> +#include "block/aio.h"

>  #include "qapi/error.h"

>  #include "qapi/qmp/dispatch.h"

>  #include "qapi/qmp/qdict.h"

>  #include "qapi/qmp/qjson.h"

>  #include "sysemu/runstate.h"

>  #include "qapi/qmp/qbool.h"

> +#include "qemu/coroutine.h"

> +#include "qemu/main-loop.h"

>  

>  static QDict *qmp_dispatch_check_obj(QDict *dict, bool allow_oob,

>                                       Error **errp)

> @@ -88,6 +92,30 @@ bool qmp_is_oob(const QDict *dict)

>          && !qdict_haskey(dict, "execute");

>  }

>  

> +typedef struct QmpDispatchBH {

> +    const QmpCommand *cmd;

> +    Monitor *cur_mon;

> +    QDict *args;

> +    QObject **ret;

> +    Error **errp;

> +    Coroutine *co;

> +} QmpDispatchBH;

> +

> +static void do_qmp_dispatch_bh(void *opaque)

> +{

> +    QmpDispatchBH *data = opaque;

> +

> +    assert(monitor_cur() == NULL);

> +    monitor_set_cur(qemu_coroutine_self(), data->cur_mon);

> +    data->cmd->fn(data->args, data->ret, data->errp);

> +    monitor_set_cur(qemu_coroutine_self(), NULL);

> +    aio_co_wake(data->co);

> +}

> +

> +/*

> + * Runs outside of coroutine context for OOB commands, but in coroutine

> + * context for everything else.

> + */

>  QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request,

>                      bool allow_oob, Monitor *cur_mon)

>  {

> @@ -153,12 +181,35 @@ QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request,

>          qobject_ref(args);

>      }

>  

> +    assert(!(oob && qemu_in_coroutine()));

>      assert(monitor_cur() == NULL);

> -    monitor_set_cur(qemu_coroutine_self(), cur_mon);

> -

> -    cmd->fn(args, &ret, &err);

> -

> -    monitor_set_cur(qemu_coroutine_self(), NULL);

> +    if (!!(cmd->options & QCO_COROUTINE) == qemu_in_coroutine()) {

> +        monitor_set_cur(qemu_coroutine_self(), cur_mon);

> +        cmd->fn(args, &ret, &err);

> +        monitor_set_cur(qemu_coroutine_self(), NULL);

> +    } else {

> +        /*

> +         * Not being in coroutine context implies that we're handling

> +         * an OOB command, which must not have QCO_COROUTINE.

> +         *

> +         * This implies that we are in coroutine context, but the

> +         * command doesn't have QCO_COROUTINE. We must drop out of

> +         * coroutine context for this one.

> +         */


I had to read this several times to get it.  The first sentence leads me
into coroutine context, and then the next sentence tells me the
opposite, throwing me into confusion.

Perhaps something like this:

           /*
            * Actual context doesn't match the one the command needs.
            * Case 1: we are in coroutine context, but command does not
            * have QCO_COROUTINE.  We need to drop out of coroutine
            * context for executing it.
            * Case 2: we are outside coroutine context, but command has
            * QCO_COROUTINE.  Can't actually happen, because we get here
            * outside coroutine context only when executing a command
            * out of band, and OOB commands never have QCO_COROUTINE.
            */

> +        assert(!oob && qemu_in_coroutine() && !(cmd->options & QCO_COROUTINE));

> +

> +        QmpDispatchBH data = {

> +            .cur_mon    = cur_mon,

> +            .cmd        = cmd,

> +            .args       = args,

> +            .ret        = &ret,

> +            .errp       = &err,

> +            .co         = qemu_coroutine_self(),

> +        };

> +        aio_bh_schedule_oneshot(qemu_get_aio_context(), do_qmp_dispatch_bh,

> +                                &data);

> +        qemu_coroutine_yield();

> +    }

>      qobject_unref(args);

>      if (err) {

>          /* or assert(!ret) after reviewing all handlers: */

> diff --git a/qapi/qmp-registry.c b/qapi/qmp-registry.c

> index d0f9a1d3e3..58c65b5052 100644

> --- a/qapi/qmp-registry.c

> +++ b/qapi/qmp-registry.c

> @@ -20,6 +20,9 @@ void qmp_register_command(QmpCommandList *cmds, const char *name,

>  {

>      QmpCommand *cmd = g_malloc0(sizeof(*cmd));

>  

> +    /* QCO_COROUTINE and QCO_ALLOW_OOB are incompatible for now */

> +    assert(!((options & QCO_COROUTINE) && (options & QCO_ALLOW_OOB)));

> +

>      cmd->name = name;

>      cmd->fn = fn;

>      cmd->enabled = true;

> diff --git a/util/aio-posix.c b/util/aio-posix.c

> index f7f13ebfc2..30bb21d699 100644

> --- a/util/aio-posix.c

> +++ b/util/aio-posix.c

> @@ -15,6 +15,7 @@

>  

>  #include "qemu/osdep.h"

>  #include "block/block.h"

> +#include "qemu/main-loop.h"

>  #include "qemu/rcu.h"

>  #include "qemu/rcu_queue.h"

>  #include "qemu/sockets.h"

> @@ -558,8 +559,13 @@ bool aio_poll(AioContext *ctx, bool blocking)

>       * There cannot be two concurrent aio_poll calls for the same AioContext (or

>       * an aio_poll concurrent with a GSource prepare/check/dispatch callback).

>       * We rely on this below to avoid slow locked accesses to ctx->notify_me.

> +     *

> +     * aio_poll() may only be called in the AioContext's thread. iohandler_ctx

> +     * is special in that it runs in the main thread, but that thread's context

> +     * is qemu_aio_context.

>       */

> -    assert(in_aio_context_home_thread(ctx));

> +    assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?

> +                                      qemu_get_aio_context() : ctx));

>  

>      qemu_lockcnt_inc(&ctx->list_lock);
Stefan Hajnoczi Sept. 15, 2020, 2:31 p.m. UTC | #4
On Wed, Sep 09, 2020 at 05:11:48PM +0200, Kevin Wolf wrote:
> Add a function to move the current coroutine to the AioContext of a
> given BlockDriverState.
> 
> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
> ---
>  include/block/block.h |  6 ++++++
>  block.c               | 10 ++++++++++
>  2 files changed, 16 insertions(+)
> 
> diff --git a/include/block/block.h b/include/block/block.h
> index 981ab5b314..80ab322f11 100644
> --- a/include/block/block.h
> +++ b/include/block/block.h
> @@ -626,6 +626,12 @@ bool bdrv_debug_is_suspended(BlockDriverState *bs, const char *tag);
>   */
>  AioContext *bdrv_get_aio_context(BlockDriverState *bs);
>  
> +/**
> + * Move the current coroutine to the AioContext of @bs and return the old
> + * AioContext of the coroutine.
> + */
> +AioContext *coroutine_fn bdrv_co_move_to_aio_context(BlockDriverState *bs);

I'm not sure this function handles all cases:
1. Being called without the BQL (i.e. from an IOThread).
2. Being called while a device stops using its IOThread.

The races that come to mind are fetching the AioContext for bs and then
scheduling a BH. The BH is executed later on by the event loop. There
might be cases where the AioContext for bs is updated before the BH
runs.

I didn't investigate these cases but wanted to mention them in case you
want to add doc comments about when this function can be used or if
you'd like to verify them yourself.

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Stefan Hajnoczi Sept. 15, 2020, 2:58 p.m. UTC | #5
On Mon, Sep 14, 2020 at 05:09:49PM +0200, Markus Armbruster wrote:
> Stefan Hajnoczi <stefanha@redhat.com> writes:
> 
> > On Wed, Sep 09, 2020 at 05:11:36PM +0200, Kevin Wolf wrote:
> >> Some QMP command handlers can block the main loop for a relatively long
> >> time, for example because they perform some I/O. This is quite nasty.
> >> Allowing such handlers to run in a coroutine where they can yield (and
> >> therefore release the BQL) while waiting for an event such as I/O
> >> completion solves the problem.
> >> 
> >> This series adds the infrastructure to allow this and switches
> >> block_resize to run in a coroutine as a first example.
> >> 
> >> This is an alternative solution to Marc-André's "monitor: add
> >> asynchronous command type" series.
> >
> > Please clarify the following in the QAPI documentation:
> >  * Is the QMP monitor suspended while the command is pending?
> >  * Are QMP events reported while the command is pending?
> 
> Good points.  Kevin, I'd be willing to take this as a follow-up patch,
> if that's more convenient for you.
> 
> > Acked-by: Stefan Hajnoczi <stefanha@redhat.com>
> 
> Stefan, I could use your proper review of PATCH 11-13.  Pretty-please?

Sounds good. I have reviewed the patches 11-13 and left questions for
Kevin.
Kevin Wolf Sept. 25, 2020, 3:38 p.m. UTC | #6
Am 14.09.2020 um 17:30 hat Markus Armbruster geschrieben:
> Kevin Wolf <kwolf@redhat.com> writes:

> 

> > This moves the QMP dispatcher to a coroutine and runs all QMP command

> > handlers that declare 'coroutine': true in coroutine context so they

> > can avoid blocking the main loop while doing I/O or waiting for other

> > events.

> >

> > For commands that are not declared safe to run in a coroutine, the

> > dispatcher drops out of coroutine context by calling the QMP command

> > handler from a bottom half.

> >

> > Signed-off-by: Kevin Wolf <kwolf@redhat.com>

> > Reviewed-by: Markus Armbruster <armbru@redhat.com>

> > ---

> >  include/qapi/qmp/dispatch.h |   1 +

> >  monitor/monitor-internal.h  |   6 +-

> >  monitor/monitor.c           |  55 +++++++++++++---

> >  monitor/qmp.c               | 122 +++++++++++++++++++++++++++---------

> >  qapi/qmp-dispatch.c         |  61 ++++++++++++++++--

> >  qapi/qmp-registry.c         |   3 +

> >  util/aio-posix.c            |   8 ++-

> >  7 files changed, 210 insertions(+), 46 deletions(-)

> >

> > diff --git a/include/qapi/qmp/dispatch.h b/include/qapi/qmp/dispatch.h

> > index 9fd2b720a7..af8d96c570 100644

> > --- a/include/qapi/qmp/dispatch.h

> > +++ b/include/qapi/qmp/dispatch.h

> > @@ -31,6 +31,7 @@ typedef enum QmpCommandOptions

> >  typedef struct QmpCommand

> >  {

> >      const char *name;

> > +    /* Runs in coroutine context if QCO_COROUTINE is set */

> >      QmpCommandFunc *fn;

> >      QmpCommandOptions options;

> >      QTAILQ_ENTRY(QmpCommand) node;

> > diff --git a/monitor/monitor-internal.h b/monitor/monitor-internal.h

> > index b39e03b744..b55d6df07f 100644

> > --- a/monitor/monitor-internal.h

> > +++ b/monitor/monitor-internal.h

> > @@ -155,7 +155,9 @@ static inline bool monitor_is_qmp(const Monitor *mon)

> >  

> >  typedef QTAILQ_HEAD(MonitorList, Monitor) MonitorList;

> >  extern IOThread *mon_iothread;

> > -extern QEMUBH *qmp_dispatcher_bh;

> > +extern Coroutine *qmp_dispatcher_co;

> > +extern bool qmp_dispatcher_co_shutdown;

> > +extern bool qmp_dispatcher_co_busy;

> >  extern QmpCommandList qmp_commands, qmp_cap_negotiation_commands;

> >  extern QemuMutex monitor_lock;

> >  extern MonitorList mon_list;

> > @@ -173,7 +175,7 @@ void monitor_fdsets_cleanup(void);

> >  

> >  void qmp_send_response(MonitorQMP *mon, const QDict *rsp);

> >  void monitor_data_destroy_qmp(MonitorQMP *mon);

> > -void monitor_qmp_bh_dispatcher(void *data);

> > +void coroutine_fn monitor_qmp_dispatcher_co(void *data);

> >  

> >  int get_monitor_def(int64_t *pval, const char *name);

> >  void help_cmd(Monitor *mon, const char *name);

> > diff --git a/monitor/monitor.c b/monitor/monitor.c

> > index 629aa073ee..ac2722bf91 100644

> > --- a/monitor/monitor.c

> > +++ b/monitor/monitor.c

> > @@ -55,8 +55,32 @@ typedef struct {

> >  /* Shared monitor I/O thread */

> >  IOThread *mon_iothread;

> >  

> > -/* Bottom half to dispatch the requests received from I/O thread */

> > -QEMUBH *qmp_dispatcher_bh;

> > +/* Coroutine to dispatch the requests received from I/O thread */

> > +Coroutine *qmp_dispatcher_co;

> > +

> > +/* Set to true when the dispatcher coroutine should terminate */

> > +bool qmp_dispatcher_co_shutdown;

> > +

> > +/*

> > + * qmp_dispatcher_co_busy is used for synchronisation between the

> > + * monitor thread and the main thread to ensure that the dispatcher

> > + * coroutine never gets scheduled a second time when it's already

> > + * scheduled (scheduling the same coroutine twice is forbidden).

> > + *

> > + * It is true if the coroutine is active and processing requests.

> > + * Additional requests may then be pushed onto mon->qmp_requests,

> > + * and @qmp_dispatcher_co_shutdown may be set without further ado.

> > + * @qmp_dispatcher_co_busy must not be woken up in this case.

> > + *

> > + * If false, you also have to set @qmp_dispatcher_co_busy to true and

> > + * wake up @qmp_dispatcher_co after pushing the new requests.

> > + *

> > + * The coroutine will automatically change this variable back to false

> > + * before it yields.  Nobody else may set the variable to false.

> > + *

> > + * Access must be atomic for thread safety.

> > + */

> > +bool qmp_dispatcher_co_busy;

> >  

> >  /*

> >   * Protects mon_list, monitor_qapi_event_state, coroutine_mon,

> > @@ -623,9 +647,24 @@ void monitor_cleanup(void)

> >      }

> >      qemu_mutex_unlock(&monitor_lock);

> >  

> > -    /* QEMUBHs needs to be deleted before destroying the I/O thread */

> > -    qemu_bh_delete(qmp_dispatcher_bh);

> > -    qmp_dispatcher_bh = NULL;

> > +    /*

> > +     * The dispatcher needs to stop before destroying the I/O thread.

> > +     *

> > +     * We need to poll both qemu_aio_context and iohandler_ctx to make

> > +     * sure that the dispatcher coroutine keeps making progress and

> > +     * eventually terminates.  qemu_aio_context is automatically

> > +     * polled by calling AIO_WAIT_WHILE on it, but we must poll

> > +     * iohandler_ctx manually.

> > +     */

> > +    qmp_dispatcher_co_shutdown = true;

> > +    if (!atomic_xchg(&qmp_dispatcher_co_busy, true)) {

> > +        aio_co_wake(qmp_dispatcher_co);

> > +    }

> > +

> > +    AIO_WAIT_WHILE(qemu_get_aio_context(),

> > +                   (aio_poll(iohandler_get_aio_context(), false),

> > +                    atomic_mb_read(&qmp_dispatcher_co_busy)));

> > +

> >      if (mon_iothread) {

> >          iothread_destroy(mon_iothread);

> >          mon_iothread = NULL;

> > @@ -649,9 +688,9 @@ void monitor_init_globals_core(void)

> >       * have commands assuming that context.  It would be nice to get

> >       * rid of those assumptions.

> >       */

> > -    qmp_dispatcher_bh = aio_bh_new(iohandler_get_aio_context(),

> > -                                   monitor_qmp_bh_dispatcher,

> > -                                   NULL);

> > +    qmp_dispatcher_co = qemu_coroutine_create(monitor_qmp_dispatcher_co, NULL);

> > +    atomic_mb_set(&qmp_dispatcher_co_busy, true);

> > +    aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);

> >  }

> >  

> >  int monitor_init(MonitorOptions *opts, bool allow_hmp, Error **errp)

> > diff --git a/monitor/qmp.c b/monitor/qmp.c

> > index 922fdb5541..69f6e93f38 100644

> > --- a/monitor/qmp.c

> > +++ b/monitor/qmp.c

> > @@ -133,6 +133,10 @@ static void monitor_qmp_respond(MonitorQMP *mon, QDict *rsp)

> >      }

> >  }

> >  

> > +/*

> > + * Runs outside of coroutine context for OOB commands, but in

> > + * coroutine context for everything else.

> > + */

> >  static void monitor_qmp_dispatch(MonitorQMP *mon, QObject *req)

> >  {

> >      QDict *rsp;

> > @@ -205,43 +209,99 @@ static QMPRequest *monitor_qmp_requests_pop_any_with_lock(void)

> >      return req_obj;

> >  }

> >  

> > -void monitor_qmp_bh_dispatcher(void *data)

> > +void coroutine_fn monitor_qmp_dispatcher_co(void *data)

> >  {

> > -    QMPRequest *req_obj = monitor_qmp_requests_pop_any_with_lock();

> > +    QMPRequest *req_obj = NULL;

> >      QDict *rsp;

> >      bool need_resume;

> >      MonitorQMP *mon;

> >  

> > -    if (!req_obj) {

> > -        return;

> > -    }

> > +    while (true) {

> > +        assert(atomic_mb_read(&qmp_dispatcher_co_busy) == true);

> >  

> > -    mon = req_obj->mon;

> > -    /*  qmp_oob_enabled() might change after "qmp_capabilities" */

> > -    need_resume = !qmp_oob_enabled(mon) ||

> > -        mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;

> > -    qemu_mutex_unlock(&mon->qmp_queue_lock);

> > -    if (req_obj->req) {

> > -        QDict *qdict = qobject_to(QDict, req_obj->req);

> > -        QObject *id = qdict ? qdict_get(qdict, "id") : NULL;

> > -        trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");

> > -        monitor_qmp_dispatch(mon, req_obj->req);

> > -    } else {

> > -        assert(req_obj->err);

> > -        rsp = qmp_error_response(req_obj->err);

> > -        req_obj->err = NULL;

> > -        monitor_qmp_respond(mon, rsp);

> > -        qobject_unref(rsp);

> > -    }

> > +        /*

> > +         * Mark the dispatcher as not busy already here so that we

> > +         * don't miss any new requests coming in the middle of our

> > +         * processing.

> > +         */

> > +        atomic_mb_set(&qmp_dispatcher_co_busy, false);

> > +

> > +        while (!(req_obj = monitor_qmp_requests_pop_any_with_lock())) {

> > +            /*

> > +             * No more requests to process.  Wait to be reentered from

> > +             * handle_qmp_command() when it pushes more requests, or

> > +             * from monitor_cleanup() when it requests shutdown.

> > +             */

> > +            if (!qmp_dispatcher_co_shutdown) {

> > +                qemu_coroutine_yield();

> > +

> > +                /*

> > +                 * busy must be set to true again by whoever

> > +                 * rescheduled us to avoid double scheduling

> > +                 */

> > +                assert(atomic_xchg(&qmp_dispatcher_co_busy, false) == true);

> > +            }

> > +

> > +            /*

> > +             * qmp_dispatcher_co_shutdown may have changed if we

> > +             * yielded and were reentered from monitor_cleanup()

> > +             */

> > +            if (qmp_dispatcher_co_shutdown) {

> > +                return;

> > +            }

> > +        }

> >  

> > -    if (need_resume) {

> > -        /* Pairs with the monitor_suspend() in handle_qmp_command() */

> > -        monitor_resume(&mon->common);

> > -    }

> > -    qmp_request_free(req_obj);

> > +        if (atomic_xchg(&qmp_dispatcher_co_busy, true) == true) {

> > +            /*

> > +             * Someone rescheduled us (probably because a new requests

> > +             * came in), but we didn't actually yield. Do that now,

> > +             * only to be immediately reentered and removed from the

> > +             * list of scheduled coroutines.

> > +             */

> > +            qemu_coroutine_yield();

> > +        }

> >  

> > -    /* Reschedule instead of looping so the main loop stays responsive */

> > -    qemu_bh_schedule(qmp_dispatcher_bh);

> > +        /*

> > +         * Move the coroutine from iohandler_ctx to qemu_aio_context for

> > +         * executing the command handler so that it can make progress if it

> > +         * involves an AIO_WAIT_WHILE().

> > +         */

> > +        aio_co_schedule(qemu_get_aio_context(), qmp_dispatcher_co);

> > +        qemu_coroutine_yield();

> > +

> > +        mon = req_obj->mon;

> > +        /* qmp_oob_enabled() might change after "qmp_capabilities" */

> > +        need_resume = !qmp_oob_enabled(mon) ||

> > +            mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;

> > +        qemu_mutex_unlock(&mon->qmp_queue_lock);

> > +        if (req_obj->req) {

> > +            QDict *qdict = qobject_to(QDict, req_obj->req);

> > +            QObject *id = qdict ? qdict_get(qdict, "id") : NULL;

> > +            trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");

> > +            monitor_qmp_dispatch(mon, req_obj->req);

> > +        } else {

> > +            assert(req_obj->err);

> > +            rsp = qmp_error_response(req_obj->err);

> > +            req_obj->err = NULL;

> > +            monitor_qmp_respond(mon, rsp);

> > +            qobject_unref(rsp);

> > +        }

> > +

> > +        if (need_resume) {

> > +            /* Pairs with the monitor_suspend() in handle_qmp_command() */

> > +            monitor_resume(&mon->common);

> > +        }

> > +        qmp_request_free(req_obj);

> > +

> > +        /*

> > +         * Yield and reschedule so the main loop stays responsive.

> > +         *

> > +         * Move back to iohandler_ctx so that nested event loops for

> > +         * qemu_aio_context don't start new monitor commands.

> > +         */

> > +        aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);

> > +        qemu_coroutine_yield();

> > +    }

> >  }

> >  

> >  static void handle_qmp_command(void *opaque, QObject *req, Error *err)

> > @@ -302,7 +362,9 @@ static void handle_qmp_command(void *opaque, QObject *req, Error *err)

> >      qemu_mutex_unlock(&mon->qmp_queue_lock);

> >  

> >      /* Kick the dispatcher routine */

> > -    qemu_bh_schedule(qmp_dispatcher_bh);

> > +    if (!atomic_xchg(&qmp_dispatcher_co_busy, true)) {

> > +        aio_co_wake(qmp_dispatcher_co);

> > +    }

> >  }

> >  

> >  static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size)

> > diff --git a/qapi/qmp-dispatch.c b/qapi/qmp-dispatch.c

> > index 5677ba92ca..754f7b854c 100644

> > --- a/qapi/qmp-dispatch.c

> > +++ b/qapi/qmp-dispatch.c

> > @@ -12,12 +12,16 @@

> >   */

> >  

> >  #include "qemu/osdep.h"

> > +

> > +#include "block/aio.h"

> >  #include "qapi/error.h"

> >  #include "qapi/qmp/dispatch.h"

> >  #include "qapi/qmp/qdict.h"

> >  #include "qapi/qmp/qjson.h"

> >  #include "sysemu/runstate.h"

> >  #include "qapi/qmp/qbool.h"

> > +#include "qemu/coroutine.h"

> > +#include "qemu/main-loop.h"

> >  

> >  static QDict *qmp_dispatch_check_obj(QDict *dict, bool allow_oob,

> >                                       Error **errp)

> > @@ -88,6 +92,30 @@ bool qmp_is_oob(const QDict *dict)

> >          && !qdict_haskey(dict, "execute");

> >  }

> >  

> > +typedef struct QmpDispatchBH {

> > +    const QmpCommand *cmd;

> > +    Monitor *cur_mon;

> > +    QDict *args;

> > +    QObject **ret;

> > +    Error **errp;

> > +    Coroutine *co;

> > +} QmpDispatchBH;

> > +

> > +static void do_qmp_dispatch_bh(void *opaque)

> > +{

> > +    QmpDispatchBH *data = opaque;

> > +

> > +    assert(monitor_cur() == NULL);

> > +    monitor_set_cur(qemu_coroutine_self(), data->cur_mon);

> > +    data->cmd->fn(data->args, data->ret, data->errp);

> > +    monitor_set_cur(qemu_coroutine_self(), NULL);

> > +    aio_co_wake(data->co);

> > +}

> > +

> > +/*

> > + * Runs outside of coroutine context for OOB commands, but in coroutine

> > + * context for everything else.

> > + */

> >  QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request,

> >                      bool allow_oob, Monitor *cur_mon)

> >  {

> > @@ -153,12 +181,35 @@ QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request,

> >          qobject_ref(args);

> >      }

> >  

> > +    assert(!(oob && qemu_in_coroutine()));

> >      assert(monitor_cur() == NULL);

> > -    monitor_set_cur(qemu_coroutine_self(), cur_mon);

> > -

> > -    cmd->fn(args, &ret, &err);

> > -

> > -    monitor_set_cur(qemu_coroutine_self(), NULL);

> > +    if (!!(cmd->options & QCO_COROUTINE) == qemu_in_coroutine()) {

> > +        monitor_set_cur(qemu_coroutine_self(), cur_mon);

> > +        cmd->fn(args, &ret, &err);

> > +        monitor_set_cur(qemu_coroutine_self(), NULL);

> > +    } else {

> > +        /*

> > +         * Not being in coroutine context implies that we're handling

> > +         * an OOB command, which must not have QCO_COROUTINE.

> > +         *

> > +         * This implies that we are in coroutine context, but the

> > +         * command doesn't have QCO_COROUTINE. We must drop out of

> > +         * coroutine context for this one.

> > +         */

> 

> I had to read this several times to get it.  The first sentence leads me

> into coroutine context, and then the next sentence tells me the

> opposite, throwing me into confusion.

> 

> Perhaps something like this:

> 

>            /*

>             * Actual context doesn't match the one the command needs.

>             * Case 1: we are in coroutine context, but command does not

>             * have QCO_COROUTINE.  We need to drop out of coroutine

>             * context for executing it.

>             * Case 2: we are outside coroutine context, but command has

>             * QCO_COROUTINE.  Can't actually happen, because we get here

>             * outside coroutine context only when executing a command

>             * out of band, and OOB commands never have QCO_COROUTINE.

>             */


Works for me. Can you squash this in while applying?

Kevin
Kevin Wolf Sept. 25, 2020, 4 p.m. UTC | #7
Am 15.09.2020 um 16:31 hat Stefan Hajnoczi geschrieben:
> On Wed, Sep 09, 2020 at 05:11:48PM +0200, Kevin Wolf wrote:
> > Add a function to move the current coroutine to the AioContext of a
> > given BlockDriverState.
> > 
> > Signed-off-by: Kevin Wolf <kwolf@redhat.com>
> > ---
> >  include/block/block.h |  6 ++++++
> >  block.c               | 10 ++++++++++
> >  2 files changed, 16 insertions(+)
> > 
> > diff --git a/include/block/block.h b/include/block/block.h
> > index 981ab5b314..80ab322f11 100644
> > --- a/include/block/block.h
> > +++ b/include/block/block.h
> > @@ -626,6 +626,12 @@ bool bdrv_debug_is_suspended(BlockDriverState *bs, const char *tag);
> >   */
> >  AioContext *bdrv_get_aio_context(BlockDriverState *bs);
> >  
> > +/**
> > + * Move the current coroutine to the AioContext of @bs and return the old
> > + * AioContext of the coroutine.
> > + */
> > +AioContext *coroutine_fn bdrv_co_move_to_aio_context(BlockDriverState *bs);
> 
> I'm not sure this function handles all cases:
> 1. Being called without the BQL (i.e. from an IOThread).
> 2. Being called while a device stops using its IOThread.
> 
> The races that come to mind are fetching the AioContext for bs and then
> scheduling a BH. The BH is executed later on by the event loop. There
> might be cases where the AioContext for bs is updated before the BH
> runs.

Updating the AioContext of a node drains the BDS first, so it would
execute any BHs still pending in the old AioContext. So this part looks
safe to me.

I'm not sure what you mean with the BQL part. I don't think I'm
accessing anything protected by the BQL?

> I didn't investigate these cases but wanted to mention them in case you
> want to add doc comments about when this function can be used or if
> you'd like to verify them yourself.

One thing that I'm not completely sure about (but that isn't really
related to this specific patch) is AioContext changes later in the
process when the actual command handler has yielded. We may have to be
careful to prevent those for coroutine based QMP commands in the block
layer.

By sheer luck, qmp_block_resize() creates a new BlockBackend that has
blk->allow_aio_context_change = false. So we're actually good in the
only command I'm converting. Phew.

Kevin
Kevin Wolf Sept. 25, 2020, 5:15 p.m. UTC | #8
Am 10.09.2020 um 15:24 hat Stefan Hajnoczi geschrieben:
> On Wed, Sep 09, 2020 at 05:11:36PM +0200, Kevin Wolf wrote:

> > Some QMP command handlers can block the main loop for a relatively long

> > time, for example because they perform some I/O. This is quite nasty.

> > Allowing such handlers to run in a coroutine where they can yield (and

> > therefore release the BQL) while waiting for an event such as I/O

> > completion solves the problem.

> > 

> > This series adds the infrastructure to allow this and switches

> > block_resize to run in a coroutine as a first example.

> > 

> > This is an alternative solution to Marc-André's "monitor: add

> > asynchronous command type" series.

> 

> Please clarify the following in the QAPI documentation:

>  * Is the QMP monitor suspended while the command is pending?


Suspended as in monitor_suspend()? No.

>  * Are QMP events reported while the command is pending?


Hm, I don't know to be honest. But I think so.

Does it matter, though? I don't think events have a defined order
compared to command results, and the client can't respond to the event
anyway until the current command has completed.

Kevin
Markus Armbruster Sept. 28, 2020, 8:24 a.m. UTC | #9
Kevin Wolf <kwolf@redhat.com> writes:

> Am 14.09.2020 um 17:30 hat Markus Armbruster geschrieben:

>> Kevin Wolf <kwolf@redhat.com> writes:

>> 

>> > This moves the QMP dispatcher to a coroutine and runs all QMP command

>> > handlers that declare 'coroutine': true in coroutine context so they

>> > can avoid blocking the main loop while doing I/O or waiting for other

>> > events.

>> >

>> > For commands that are not declared safe to run in a coroutine, the

>> > dispatcher drops out of coroutine context by calling the QMP command

>> > handler from a bottom half.

>> >

>> > Signed-off-by: Kevin Wolf <kwolf@redhat.com>

>> > Reviewed-by: Markus Armbruster <armbru@redhat.com>

>> > ---

[...]
>> > diff --git a/qapi/qmp-dispatch.c b/qapi/qmp-dispatch.c

>> > index 5677ba92ca..754f7b854c 100644

>> > --- a/qapi/qmp-dispatch.c

>> > +++ b/qapi/qmp-dispatch.c

[...]
>> > @@ -153,12 +181,35 @@ QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request,

>> >          qobject_ref(args);

>> >      }

>> >  

>> > +    assert(!(oob && qemu_in_coroutine()));

>> >      assert(monitor_cur() == NULL);

>> > -    monitor_set_cur(qemu_coroutine_self(), cur_mon);

>> > -

>> > -    cmd->fn(args, &ret, &err);

>> > -

>> > -    monitor_set_cur(qemu_coroutine_self(), NULL);

>> > +    if (!!(cmd->options & QCO_COROUTINE) == qemu_in_coroutine()) {

>> > +        monitor_set_cur(qemu_coroutine_self(), cur_mon);

>> > +        cmd->fn(args, &ret, &err);

>> > +        monitor_set_cur(qemu_coroutine_self(), NULL);

>> > +    } else {

>> > +        /*

>> > +         * Not being in coroutine context implies that we're handling

>> > +         * an OOB command, which must not have QCO_COROUTINE.

>> > +         *

>> > +         * This implies that we are in coroutine context, but the

>> > +         * command doesn't have QCO_COROUTINE. We must drop out of

>> > +         * coroutine context for this one.

>> > +         */

>> 

>> I had to read this several times to get it.  The first sentence leads me

>> into coroutine context, and then the next sentence tells me the

>> opposite, throwing me into confusion.

>> 

>> Perhaps something like this:

>> 

>>            /*

>>             * Actual context doesn't match the one the command needs.

>>             * Case 1: we are in coroutine context, but command does not

>>             * have QCO_COROUTINE.  We need to drop out of coroutine

>>             * context for executing it.

>>             * Case 2: we are outside coroutine context, but command has

>>             * QCO_COROUTINE.  Can't actually happen, because we get here

>>             * outside coroutine context only when executing a command

>>             * out of band, and OOB commands never have QCO_COROUTINE.

>>             */

>

> Works for me. Can you squash this in while applying?


Sure!
Stefan Hajnoczi Sept. 28, 2020, 8:46 a.m. UTC | #10
On Fri, Sep 25, 2020 at 07:15:41PM +0200, Kevin Wolf wrote:
> Am 10.09.2020 um 15:24 hat Stefan Hajnoczi geschrieben:
> > On Wed, Sep 09, 2020 at 05:11:36PM +0200, Kevin Wolf wrote:
> > > Some QMP command handlers can block the main loop for a relatively long
> > > time, for example because they perform some I/O. This is quite nasty.
> > > Allowing such handlers to run in a coroutine where they can yield (and
> > > therefore release the BQL) while waiting for an event such as I/O
> > > completion solves the problem.
> > > 
> > > This series adds the infrastructure to allow this and switches
> > > block_resize to run in a coroutine as a first example.
> > > 
> > > This is an alternative solution to Marc-André's "monitor: add
> > > asynchronous command type" series.
> > 
> > Please clarify the following in the QAPI documentation:
> >  * Is the QMP monitor suspended while the command is pending?
> 
> Suspended as in monitor_suspend()? No.
> 
> >  * Are QMP events reported while the command is pending?
> 
> Hm, I don't know to be honest. But I think so.
> 
> Does it matter, though? I don't think events have a defined order
> compared to command results, and the client can't respond to the event
> anyway until the current command has completed.

You're right, I don't think it matters because the client must expect
QMP events at any time.

I was trying to understand the semantics of coroutine monitor commands
from two perspectives:

1. The QMP client - do coroutine commands behave differently from
   non-coroutine commands? I think the answer is no. The monitor will
   not process more commands until the coroutine finishes?

2. The command implementation - which thread does the coroutine run in?
   I guess it runs in the main loop thread with the BQL and the
   AioContext acquired?
Stefan Hajnoczi Sept. 28, 2020, 8:59 a.m. UTC | #11
On Fri, Sep 25, 2020 at 06:00:51PM +0200, Kevin Wolf wrote:
> Am 15.09.2020 um 16:31 hat Stefan Hajnoczi geschrieben:

> > On Wed, Sep 09, 2020 at 05:11:48PM +0200, Kevin Wolf wrote:

> > > Add a function to move the current coroutine to the AioContext of a

> > > given BlockDriverState.

> > > 

> > > Signed-off-by: Kevin Wolf <kwolf@redhat.com>

> > > ---

> > >  include/block/block.h |  6 ++++++

> > >  block.c               | 10 ++++++++++

> > >  2 files changed, 16 insertions(+)

> > > 

> > > diff --git a/include/block/block.h b/include/block/block.h

> > > index 981ab5b314..80ab322f11 100644

> > > --- a/include/block/block.h

> > > +++ b/include/block/block.h

> > > @@ -626,6 +626,12 @@ bool bdrv_debug_is_suspended(BlockDriverState *bs, const char *tag);

> > >   */

> > >  AioContext *bdrv_get_aio_context(BlockDriverState *bs);

> > >  

> > > +/**

> > > + * Move the current coroutine to the AioContext of @bs and return the old

> > > + * AioContext of the coroutine.

> > > + */

> > > +AioContext *coroutine_fn bdrv_co_move_to_aio_context(BlockDriverState *bs);

> > 

> > I'm not sure this function handles all cases:

> > 1. Being called without the BQL (i.e. from an IOThread).

> > 2. Being called while a device stops using its IOThread.

> > 

> > The races that come to mind are fetching the AioContext for bs and then

> > scheduling a BH. The BH is executed later on by the event loop. There

> > might be cases where the AioContext for bs is updated before the BH

> > runs.


The scenario I'm thinking about is where bs' AioContext changes while we
are trying to move there.

There is a window of time between fetching bs' AioContext, scheduling a
BH in our old AioContext (not in bs' AioContext), and then scheduling
the coroutine into the AioContext we previously fetched for bs.

Is it possible for the AioContext value we stashed to be outdated by the
time we use it?

I think the answer is that it's safe to use this function from the main
loop thread under the BQL. That way nothing else will change bs'
AioContext while we're running. But it's probably not safe to use this
function from an arbitrary IOThread (without the BQL).

I think this limitation is okay but it needs to be documented. Maybe an
assertion can verify that it holds.

Stefan
Stefan Hajnoczi Sept. 28, 2020, 9:05 a.m. UTC | #12
On Fri, Sep 25, 2020 at 06:07:50PM +0200, Kevin Wolf wrote:
> Am 15.09.2020 um 16:57 hat Stefan Hajnoczi geschrieben:
> > On Wed, Sep 09, 2020 at 05:11:49PM +0200, Kevin Wolf wrote:
> > > @@ -2456,8 +2456,7 @@ void qmp_block_resize(bool has_device, const char *device,
> > >          return;
> > >      }
> > >  
> > > -    aio_context = bdrv_get_aio_context(bs);
> > > -    aio_context_acquire(aio_context);
> > > +    old_ctx = bdrv_co_move_to_aio_context(bs);
> > >  
> > >      if (size < 0) {
> > >          error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "size", "a >0 size");
> > 
> > Is it safe to call blk_new() outside the BQL since it mutates global state?
> > 
> > In other words, could another thread race with us?
> 
> Hm, probably not.
> 
> Would it be safer to have the bdrv_co_move_to_aio_context() call only
> immediately before the drain?

Yes, sounds good.

> > > @@ -2479,8 +2478,8 @@ void qmp_block_resize(bool has_device, const char *device,
> > >      bdrv_drained_end(bs);
> > >  
> > >  out:
> > > +    aio_co_reschedule_self(old_ctx);
> > >      blk_unref(blk);
> > > -    aio_context_release(aio_context);
> > 
> > The following precondition is violated by the blk_unref -> bdrv_drain ->
> > AIO_WAIT_WHILE() call if blk->refcnt is 1 here:
> > 
> >  * The caller's thread must be the IOThread that owns @ctx or the main loop
> >  * thread (with @ctx acquired exactly once).
> > 
> > blk_unref() is called from the main loop thread without having acquired
> > blk's AioContext.
> > 
> > Normally blk->refcnt will be > 1 so bdrv_drain() won't be called, but
> > I'm not sure if that can be guaranteed.
> > 
> > The following seems safer although it's uglier:
> > 
> >   aio_context = bdrv_get_aio_context(bs);
> >   aio_context_acquire(aio_context);
> >   blk_unref(blk);
> >   aio_context_release(aio_context);
> 
> May we actually acquire aio_context if blk is in the main thread? I
> think we must only do this if it's in a different iothread because we'd
> end up with a recursive lock and drain would hang.

Right :). Maybe an aio_context_acquire_once() API would help.

Stefan
Markus Armbruster Sept. 28, 2020, 9:30 a.m. UTC | #13
Kevin Wolf <kwolf@redhat.com> writes:

> Am 10.09.2020 um 15:24 hat Stefan Hajnoczi geschrieben:

>> On Wed, Sep 09, 2020 at 05:11:36PM +0200, Kevin Wolf wrote:

>> > Some QMP command handlers can block the main loop for a relatively long

>> > time, for example because they perform some I/O. This is quite nasty.

>> > Allowing such handlers to run in a coroutine where they can yield (and

>> > therefore release the BQL) while waiting for an event such as I/O

>> > completion solves the problem.

>> > 

>> > This series adds the infrastructure to allow this and switches

>> > block_resize to run in a coroutine as a first example.

>> > 

>> > This is an alternative solution to Marc-André's "monitor: add

>> > asynchronous command type" series.

>> 

>> Please clarify the following in the QAPI documentation:

>>  * Is the QMP monitor suspended while the command is pending?

>

> Suspended as in monitor_suspend()? No.


A suspended monitor doesn't read monitor input.

We suspend

* a QMP monitor while the request queue is full

* an HMP monitor while it executes a command

* a multiplexed HMP monitor while the "mux-focus" is elsewhere

* an HMP monitor when it executes command "quit", forever

* an HMP monitor while it executes command "migrate" without -d

Let me explain the first item in a bit more detail.  Before OOB, a QMP
monitor was also suspended while it executed a command.  To make OOB
work, we moved the QMP monitors to an I/O thread and added a request
queue, drained by the main loop.  QMP monitors continue to read
commands, execute right away if OOB, else queue, suspend when queue gets
full, resume when it gets non-full.

The "run command in coroutine context" feature does not affect any of
this.

qapi-code-gen.txt does not talk about monitor suspension at all.  It's
instead discussed in qmp-spec.txt section 2.3.1 Out-of-band execution.

Stefan, what would you like us to clarify, and where?

>>  * Are QMP events reported while the command is pending?

>

> Hm, I don't know to be honest. But I think so.


Yes, events should be reported while a command is being executed.

Sending events takes locks.  Their critical sections are all
short-lived.  Another possible delay is the underlying character device
failing the send with EAGAIN.  That's all.

Fine print: qapi_event_emit() takes @monitor_lock.  It sends to each QMP
monitor with qmp_send_response(), which uses monitor_puts(), which takes
the monitor's @mon_lock.

The "run command in coroutine context" feature does not affect any of
this.

> Does it matter, though? I don't think events have a defined order

> compared to command results, and the client can't respond to the event

> anyway until the current command has completed.


Stefan, what would you like us to clarify, and where?
Kevin Wolf Sept. 28, 2020, 9:47 a.m. UTC | #14
Am 28.09.2020 um 10:46 hat Stefan Hajnoczi geschrieben:
> On Fri, Sep 25, 2020 at 07:15:41PM +0200, Kevin Wolf wrote:
> > Am 10.09.2020 um 15:24 hat Stefan Hajnoczi geschrieben:
> > > On Wed, Sep 09, 2020 at 05:11:36PM +0200, Kevin Wolf wrote:
> > > > Some QMP command handlers can block the main loop for a relatively long
> > > > time, for example because they perform some I/O. This is quite nasty.
> > > > Allowing such handlers to run in a coroutine where they can yield (and
> > > > therefore release the BQL) while waiting for an event such as I/O
> > > > completion solves the problem.
> > > > 
> > > > This series adds the infrastructure to allow this and switches
> > > > block_resize to run in a coroutine as a first example.
> > > > 
> > > > This is an alternative solution to Marc-André's "monitor: add
> > > > asynchronous command type" series.
> > > 
> > > Please clarify the following in the QAPI documentation:
> > >  * Is the QMP monitor suspended while the command is pending?
> > 
> > Suspended as in monitor_suspend()? No.
> > 
> > >  * Are QMP events reported while the command is pending?
> > 
> > Hm, I don't know to be honest. But I think so.
> > 
> > Does it matter, though? I don't think events have a defined order
> > compared to command results, and the client can't respond to the event
> > anyway until the current command has completed.
> 
> You're right, I don't think it matters because the client must expect
> QMP events at any time.
> 
> I was trying to understand the semantics of coroutine monitor commands
> from two perspectives:
> 
> 1. The QMP client - do coroutine commands behave differently from
>    non-coroutine commands? I think the answer is no. The monitor will
>    not process more commands until the coroutine finishes?

No, on the wire, things should look exactly the same. If you consider
more than the QMP traffic and the client communicates with the guest, it
might see that the guest isn't blocked any more, of course.

> 2. The command implementation - which thread does the coroutine run in?
>    I guess it runs in the main loop thread with the BQL and the
>    AioContext acquired?

By default, yes. But the coroutine can reschedule itself to another
thread. Block-related handlers will want to reschedule themselves to
bs->aio_context because you can't just hold the AioContext lock from
another thread across yields. This is what block_resize does after this
series.

Kevin
Kevin Wolf Sept. 28, 2020, 10:21 a.m. UTC | #15
Am 28.09.2020 um 10:59 hat Stefan Hajnoczi geschrieben:
> On Fri, Sep 25, 2020 at 06:00:51PM +0200, Kevin Wolf wrote:

> > Am 15.09.2020 um 16:31 hat Stefan Hajnoczi geschrieben:

> > > On Wed, Sep 09, 2020 at 05:11:48PM +0200, Kevin Wolf wrote:

> > > > Add a function to move the current coroutine to the AioContext of a

> > > > given BlockDriverState.

> > > > 

> > > > Signed-off-by: Kevin Wolf <kwolf@redhat.com>

> > > > ---

> > > >  include/block/block.h |  6 ++++++

> > > >  block.c               | 10 ++++++++++

> > > >  2 files changed, 16 insertions(+)

> > > > 

> > > > diff --git a/include/block/block.h b/include/block/block.h

> > > > index 981ab5b314..80ab322f11 100644

> > > > --- a/include/block/block.h

> > > > +++ b/include/block/block.h

> > > > @@ -626,6 +626,12 @@ bool bdrv_debug_is_suspended(BlockDriverState *bs, const char *tag);

> > > >   */

> > > >  AioContext *bdrv_get_aio_context(BlockDriverState *bs);

> > > >  

> > > > +/**

> > > > + * Move the current coroutine to the AioContext of @bs and return the old

> > > > + * AioContext of the coroutine.

> > > > + */

> > > > +AioContext *coroutine_fn bdrv_co_move_to_aio_context(BlockDriverState *bs);

> > > 

> > > I'm not sure this function handles all cases:

> > > 1. Being called without the BQL (i.e. from an IOThread).

> > > 2. Being called while a device stops using its IOThread.

> > > 

> > > The races that come to mind are fetching the AioContext for bs and then

> > > scheduling a BH. The BH is executed later on by the event loop. There

> > > might be cases where the AioContext for bs is updated before the BH

> > > runs.

> 

> The scenario I'm thinking about is where bs' AioContext changes while we

> are trying to move there.

> 

> There is a window of time between fetching bs' AioContext, scheduling a

> BH in our old AioContext (not in bs' AioContext), and then scheduling

> the coroutine into the AioContext we previously fetched for bs.

> 

> Is it possible for the AioContext value we stashed to be outdated by the

> time we use it?

> 

> I think the answer is that it's safe to use this function from the main

> loop thread under the BQL. That way nothing else will change bs'

> AioContext while we're running. But it's probably not safe to use this

> function from an arbitrary IOThread (without the BQL).


It's probably the safest to treat it as such. The first part of it (the
window between fetching bs->aio_context and using it) is actually also
true for this ubiquitous sequence:

    AioContext *ctx = bdrv_get_aio_context(bs);
    aio_context_acquire(ctx);

I never really thought about this, but this is only safe in the main
thread. Most of its users are of course monitor command handlers, which
always run in the main thread.

> I think this limitation is okay but it needs to be documented. Maybe an

> assertion can verify that it holds.


Yes, why not.

Maybe we should actually change the interface into a pair of
bdrv_co_enter/leave() functions that also increase bs->in_flight so that
the whole section will complete before the AioContext of bs changes
(changing the AioContext while the handle coroutine has yielded and will
continue to run in the old context would be bad).

block_resize is safe without it, but it might be better to introduce
patterns that will be safe without being extra careful in each command.

Kevin
Kevin Wolf Sept. 28, 2020, 10:33 a.m. UTC | #16
Am 28.09.2020 um 11:05 hat Stefan Hajnoczi geschrieben:
> On Fri, Sep 25, 2020 at 06:07:50PM +0200, Kevin Wolf wrote:
> > Am 15.09.2020 um 16:57 hat Stefan Hajnoczi geschrieben:
> > > On Wed, Sep 09, 2020 at 05:11:49PM +0200, Kevin Wolf wrote:
> > > > @@ -2456,8 +2456,7 @@ void qmp_block_resize(bool has_device, const char *device,
> > > >          return;
> > > >      }
> > > >  
> > > > -    aio_context = bdrv_get_aio_context(bs);
> > > > -    aio_context_acquire(aio_context);
> > > > +    old_ctx = bdrv_co_move_to_aio_context(bs);
> > > >  
> > > >      if (size < 0) {
> > > >          error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "size", "a >0 size");
> > > 
> > > Is it safe to call blk_new() outside the BQL since it mutates global state?
> > > 
> > > In other words, could another thread race with us?
> > 
> > Hm, probably not.
> > 
> > Would it be safer to have the bdrv_co_move_to_aio_context() call only
> > immediately before the drain?
> 
> Yes, sounds good.
> 
> > > > @@ -2479,8 +2478,8 @@ void qmp_block_resize(bool has_device, const char *device,
> > > >      bdrv_drained_end(bs);
> > > >  
> > > >  out:
> > > > +    aio_co_reschedule_self(old_ctx);
> > > >      blk_unref(blk);
> > > > -    aio_context_release(aio_context);
> > > 
> > > The following precondition is violated by the blk_unref -> bdrv_drain ->
> > > AIO_WAIT_WHILE() call if blk->refcnt is 1 here:
> > > 
> > >  * The caller's thread must be the IOThread that owns @ctx or the main loop
> > >  * thread (with @ctx acquired exactly once).
> > > 
> > > blk_unref() is called from the main loop thread without having acquired
> > > blk's AioContext.
> > > 
> > > Normally blk->refcnt will be > 1 so bdrv_drain() won't be called, but
> > > I'm not sure if that can be guaranteed.
> > > 
> > > The following seems safer although it's uglier:
> > > 
> > >   aio_context = bdrv_get_aio_context(bs);
> > >   aio_context_acquire(aio_context);
> > >   blk_unref(blk);
> > >   aio_context_release(aio_context);
> > 
> > May we actually acquire aio_context if blk is in the main thread? I
> > think we must only do this if it's in a different iothread because we'd
> > end up with a recursive lock and drain would hang.
> 
> Right :). Maybe an aio_context_acquire_once() API would help.

If you want it to work in the general case, how would you implement
this? As far as I know there is no way to tell whether we already own
the lock or not.

Something like aio_context_acquire_unless_self() might be easier to
implement.

Kevin
Markus Armbruster Oct. 2, 2020, 7:51 a.m. UTC | #17
Additional nitpick detail on Kevin's request.

Kevin Wolf <kwolf@redhat.com> writes:

> cur_mon really needs to be coroutine-local as soon as we move monitor
> command handlers to coroutines and let them yield. As a first step, just
> remove all direct accesses to cur_mon so that we can implement this in
> the getter function later.
>
> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
[...]
> diff --git a/tests/test-util-sockets.c b/tests/test-util-sockets.c
> index af9f5c0c70..6c50dbf051 100644
> --- a/tests/test-util-sockets.c
> +++ b/tests/test-util-sockets.c
> @@ -52,6 +52,7 @@ static void test_fd_is_socket_good(void)
>  
>  static int mon_fd = -1;
>  static const char *mon_fdname;
> +__thread Monitor *cur_mon;
>  
>  int monitor_get_fd(Monitor *mon, const char *fdname, Error **errp)
>  {
> @@ -66,15 +67,12 @@ int monitor_get_fd(Monitor *mon, const char *fdname, Error **errp)
>  
>  /*
>   * Syms of stubs in libqemuutil.a are discarded at .o file granularity.
> - * To replace monitor_get_fd() we must ensure everything in
> - * stubs/monitor.c is defined, to make sure monitor.o is discarded
> - * otherwise we get duplicate syms at link time.
> + * To replace monitor_get_fd() and monitor_cur(), we must ensure that we also
> + * replace any other symbol that is used in the binary and would be taken from
> + * the same stub object file, otherwise we get duplicate syms at link time.

Wrapping the comment around column 70 or so would make it easier to
read.  File has no maintainers.  Up to you.

>   */
> -__thread Monitor *cur_mon;
> +Monitor *monitor_cur(void) { return cur_mon; }
>  int monitor_vprintf(Monitor *mon, const char *fmt, va_list ap) { abort(); }
> -void monitor_init_qmp(Chardev *chr, bool pretty, Error **errp) {}
> -void monitor_init_hmp(Chardev *chr, bool use_readline, Error **errp) {}
> -
>  
>  static void test_socket_fd_pass_name_good(void)
>  {
[...]
Markus Armbruster Oct. 2, 2020, 8:01 a.m. UTC | #18
Additional nitpick detail on Kevin's request.

Kevin Wolf <kwolf@redhat.com> writes:

> This moves the QMP dispatcher to a coroutine and runs all QMP command
> handlers that declare 'coroutine': true in coroutine context so they
> can avoid blocking the main loop while doing I/O or waiting for other
> events.
>
> For commands that are not declared safe to run in a coroutine, the
> dispatcher drops out of coroutine context by calling the QMP command
> handler from a bottom half.
>
> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
> Reviewed-by: Markus Armbruster <armbru@redhat.com>
[...]
> diff --git a/monitor/monitor.c b/monitor/monitor.c
> index 629aa073ee..ac2722bf91 100644
> --- a/monitor/monitor.c
> +++ b/monitor/monitor.c
> @@ -55,8 +55,32 @@ typedef struct {
>  /* Shared monitor I/O thread */
>  IOThread *mon_iothread;
>  
> -/* Bottom half to dispatch the requests received from I/O thread */
> -QEMUBH *qmp_dispatcher_bh;
> +/* Coroutine to dispatch the requests received from I/O thread */
> +Coroutine *qmp_dispatcher_co;
> +
> +/* Set to true when the dispatcher coroutine should terminate */
> +bool qmp_dispatcher_co_shutdown;
> +
> +/*
> + * qmp_dispatcher_co_busy is used for synchronisation between the
> + * monitor thread and the main thread to ensure that the dispatcher
> + * coroutine never gets scheduled a second time when it's already
> + * scheduled (scheduling the same coroutine twice is forbidden).
> + *
> + * It is true if the coroutine is active and processing requests.
> + * Additional requests may then be pushed onto mon->qmp_requests,
> + * and @qmp_dispatcher_co_shutdown may be set without further ado.
> + * @qmp_dispatcher_co_busy must not be woken up in this case.
> + *
> + * If false, you also have to set @qmp_dispatcher_co_busy to true and
> + * wake up @qmp_dispatcher_co after pushing the new requests.
> + *
> + * The coroutine will automatically change this variable back to false
> + * before it yields.  Nobody else may set the variable to false.
> + *
> + * Access must be atomic for thread safety.
> + */
> +bool qmp_dispatcher_co_busy;
>  
>  /*
>   * Protects mon_list, monitor_qapi_event_state, coroutine_mon,
> @@ -623,9 +647,24 @@ void monitor_cleanup(void)
>      }
>      qemu_mutex_unlock(&monitor_lock);
>  
> -    /* QEMUBHs needs to be deleted before destroying the I/O thread */
> -    qemu_bh_delete(qmp_dispatcher_bh);
> -    qmp_dispatcher_bh = NULL;
> +    /*
> +     * The dispatcher needs to stop before destroying the I/O thread.
> +     *
> +     * We need to poll both qemu_aio_context and iohandler_ctx to make
> +     * sure that the dispatcher coroutine keeps making progress and
> +     * eventually terminates.  qemu_aio_context is automatically
> +     * polled by calling AIO_WAIT_WHILE on it, but we must poll
> +     * iohandler_ctx manually.
> +     */
> +    qmp_dispatcher_co_shutdown = true;
> +    if (!atomic_xchg(&qmp_dispatcher_co_busy, true)) {
> +        aio_co_wake(qmp_dispatcher_co);
> +    }
> +
> +    AIO_WAIT_WHILE(qemu_get_aio_context(),
> +                   (aio_poll(iohandler_get_aio_context(), false),
> +                    atomic_mb_read(&qmp_dispatcher_co_busy)));
> +
>      if (mon_iothread) {
>          iothread_destroy(mon_iothread);
>          mon_iothread = NULL;
> @@ -649,9 +688,9 @@ void monitor_init_globals_core(void)
>       * have commands assuming that context.  It would be nice to get
>       * rid of those assumptions.
>       */
> -    qmp_dispatcher_bh = aio_bh_new(iohandler_get_aio_context(),
> -                                   monitor_qmp_bh_dispatcher,
> -                                   NULL);
> +    qmp_dispatcher_co = qemu_coroutine_create(monitor_qmp_dispatcher_co, NULL);

Rather long line, caused by rather long identifiers.

Not your fault; you imitated the existing pattern static variable
qmp_dispatcher_bh / extern function monitor_qmp_bh_dispatcher().  But
the prefix monitor_qmp_ is awfully long, and not just here.  Let's leave
this for another day.

> +    atomic_mb_set(&qmp_dispatcher_co_busy, true);
> +    aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
>  }
>  
>  int monitor_init(MonitorOptions *opts, bool allow_hmp, Error **errp)
[...]
> diff --git a/util/aio-posix.c b/util/aio-posix.c
> index f7f13ebfc2..30bb21d699 100644
> --- a/util/aio-posix.c
> +++ b/util/aio-posix.c
> @@ -15,6 +15,7 @@
>  
>  #include "qemu/osdep.h"
>  #include "block/block.h"
> +#include "qemu/main-loop.h"
>  #include "qemu/rcu.h"
>  #include "qemu/rcu_queue.h"
>  #include "qemu/sockets.h"
> @@ -558,8 +559,13 @@ bool aio_poll(AioContext *ctx, bool blocking)
>       * There cannot be two concurrent aio_poll calls for the same AioContext (or
>       * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
>       * We rely on this below to avoid slow locked accesses to ctx->notify_me.
> +     *
> +     * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
> +     * is special in that it runs in the main thread, but that thread's context
> +     * is qemu_aio_context.

Wrapping the comment around column 70 or so would make it easier to
read.  Up to you.

>       */
> -    assert(in_aio_context_home_thread(ctx));
> +    assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
> +                                      qemu_get_aio_context() : ctx));
>  
>      qemu_lockcnt_inc(&ctx->list_lock);
Markus Armbruster Oct. 2, 2020, 8:01 a.m. UTC | #19
Additional nitpick detail on Kevin's request.

Kevin Wolf <kwolf@redhat.com> writes:

> Add a function that can be used to move the currently running coroutine
> to a different AioContext (and therefore potentially a different
> thread).
>
> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
> ---
>  include/block/aio.h | 10 ++++++++++
>  util/async.c        | 30 ++++++++++++++++++++++++++++++
>  2 files changed, 40 insertions(+)
>
> diff --git a/include/block/aio.h b/include/block/aio.h
> index b2f703fa3f..c37617b404 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -17,6 +17,7 @@
>  #ifdef CONFIG_LINUX_IO_URING
>  #include <liburing.h>
>  #endif
> +#include "qemu/coroutine.h"
>  #include "qemu/queue.h"
>  #include "qemu/event_notifier.h"
>  #include "qemu/thread.h"
> @@ -654,6 +655,15 @@ static inline bool aio_node_check(AioContext *ctx, bool is_external)
>   */
>  void aio_co_schedule(AioContext *ctx, struct Coroutine *co);
>  
> +/**
> + * aio_co_reschedule_self:
> + * @new_ctx: the new context
> + *
> + * Move the currently running coroutine to new_ctx. If the coroutine is already
> + * running in new_ctx, do nothing.

Wrapping the comment around column 70 or so would make it easier to
read.  Up to you.

> + */
> +void coroutine_fn aio_co_reschedule_self(AioContext *new_ctx);
> +
>  /**
>   * aio_co_wake:
>   * @co: the coroutine
> diff --git a/util/async.c b/util/async.c
> index 4266745dee..a609e18693 100644
> --- a/util/async.c
> +++ b/util/async.c
> @@ -569,6 +569,36 @@ void aio_co_schedule(AioContext *ctx, Coroutine *co)
>      aio_context_unref(ctx);
>  }
>  
> +typedef struct AioCoRescheduleSelf {
> +    Coroutine *co;
> +    AioContext *new_ctx;
> +} AioCoRescheduleSelf;
> +
> +static void aio_co_reschedule_self_bh(void *opaque)
> +{
> +    AioCoRescheduleSelf *data = opaque;
> +    aio_co_schedule(data->new_ctx, data->co);
> +}
> +
> +void coroutine_fn aio_co_reschedule_self(AioContext *new_ctx)
> +{
> +    AioContext *old_ctx = qemu_get_current_aio_context();
> +
> +    if (old_ctx != new_ctx) {
> +        AioCoRescheduleSelf data = {
> +            .co = qemu_coroutine_self(),
> +            .new_ctx = new_ctx,
> +        };
> +        /*
> +         * We can't directly schedule the coroutine in the target context
> +         * because this would be racy: The other thread could try to enter the
> +         * coroutine before it has yielded in this one.
> +         */

Likewise.

> +        aio_bh_schedule_oneshot(old_ctx, aio_co_reschedule_self_bh, &data);
> +        qemu_coroutine_yield();
> +    }
> +}
> +
>  void aio_co_wake(struct Coroutine *co)
>  {
>      AioContext *ctx;