diff mbox series

[PULL,v3,15/28] util/vhost-user-server: rework vu_client_trip() coroutine lifecycle

Message ID 20201023152147.1016281-16-stefanha@redhat.com
State Superseded
Headers show
Series Block patches | expand

Commit Message

Stefan Hajnoczi Oct. 23, 2020, 3:21 p.m. UTC
The vu_client_trip() coroutine is leaked during AioContext switching. It
is also unsafe to destroy the vu_dev in panic_cb() since its callers
still access it in some cases.

Rework the lifecycle to solve these safety issues.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Message-id: 20200924151549.913737-10-stefanha@redhat.com
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 util/vhost-user-server.h             |  29 ++--
 block/export/vhost-user-blk-server.c |   9 +-
 util/vhost-user-server.c             | 245 +++++++++++++++------------
 3 files changed, 155 insertions(+), 128 deletions(-)
diff mbox series

Patch

diff --git a/util/vhost-user-server.h b/util/vhost-user-server.h
index 92177fc911..0da4c2cc4c 100644
--- a/util/vhost-user-server.h
+++ b/util/vhost-user-server.h
@@ -19,34 +19,36 @@ 
 #include "qapi/error.h"
 #include "standard-headers/linux/virtio_blk.h"
 
+/* A kick fd that we monitor on behalf of libvhost-user */
 typedef struct VuFdWatch {
     VuDev *vu_dev;
     int fd; /*kick fd*/
     void *pvt;
     vu_watch_cb cb;
-    bool processing;
     QTAILQ_ENTRY(VuFdWatch) next;
 } VuFdWatch;
 
-typedef struct VuServer VuServer;
-
-struct VuServer {
+/**
+ * VuServer:
+ * A vhost-user server instance with user-defined VuDevIface callbacks.
+ * Vhost-user device backends can be implemented using VuServer. VuDevIface
+ * callbacks and virtqueue kicks run in the given AioContext.
+ */
+typedef struct {
     QIONetListener *listener;
+    QEMUBH *restart_listener_bh;
     AioContext *ctx;
     int max_queues;
     const VuDevIface *vu_iface;
+
+    /* Protected by ctx lock */
     VuDev vu_dev;
     QIOChannel *ioc; /* The I/O channel with the client */
     QIOChannelSocket *sioc; /* The underlying data channel with the client */
-    /* IOChannel for fd provided via VHOST_USER_SET_SLAVE_REQ_FD */
-    QIOChannel *ioc_slave;
-    QIOChannelSocket *sioc_slave;
-    Coroutine *co_trip; /* coroutine for processing VhostUserMsg */
     QTAILQ_HEAD(, VuFdWatch) vu_fd_watches;
-    /* restart coroutine co_trip if AIOContext is changed */
-    bool aio_context_changed;
-    bool processing_msg;
-};
+
+    Coroutine *co_trip; /* coroutine for processing VhostUserMsg */
+} VuServer;
 
 bool vhost_user_server_start(VuServer *server,
                              SocketAddress *unix_socket,
@@ -57,6 +59,7 @@  bool vhost_user_server_start(VuServer *server,
 
 void vhost_user_server_stop(VuServer *server);
 
-void vhost_user_server_set_aio_context(VuServer *server, AioContext *ctx);
+void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx);
+void vhost_user_server_detach_aio_context(VuServer *server);
 
 #endif /* VHOST_USER_SERVER_H */
diff --git a/block/export/vhost-user-blk-server.c b/block/export/vhost-user-blk-server.c
index c8fa4ecba9..4d35232bf3 100644
--- a/block/export/vhost-user-blk-server.c
+++ b/block/export/vhost-user-blk-server.c
@@ -313,18 +313,13 @@  static const VuDevIface vu_block_iface = {
 static void blk_aio_attached(AioContext *ctx, void *opaque)
 {
     VuBlockDev *vub_dev = opaque;
-    aio_context_acquire(ctx);
-    vhost_user_server_set_aio_context(&vub_dev->vu_server, ctx);
-    aio_context_release(ctx);
+    vhost_user_server_attach_aio_context(&vub_dev->vu_server, ctx);
 }
 
 static void blk_aio_detach(void *opaque)
 {
     VuBlockDev *vub_dev = opaque;
-    AioContext *ctx = vub_dev->vu_server.ctx;
-    aio_context_acquire(ctx);
-    vhost_user_server_set_aio_context(&vub_dev->vu_server, NULL);
-    aio_context_release(ctx);
+    vhost_user_server_detach_aio_context(&vub_dev->vu_server);
 }
 
 static void
diff --git a/util/vhost-user-server.c b/util/vhost-user-server.c
index 981908fef0..c448800e58 100644
--- a/util/vhost-user-server.c
+++ b/util/vhost-user-server.c
@@ -9,8 +9,50 @@ 
  */
 #include "qemu/osdep.h"
 #include "qemu/main-loop.h"
+#include "block/aio-wait.h"
 #include "vhost-user-server.h"
 
+/*
+ * Theory of operation:
+ *
+ * VuServer is started and stopped by vhost_user_server_start() and
+ * vhost_user_server_stop() from the main loop thread. Starting the server
+ * opens a vhost-user UNIX domain socket and listens for incoming connections.
+ * Only one connection is allowed at a time.
+ *
+ * The connection is handled by the vu_client_trip() coroutine in the
+ * VuServer->ctx AioContext. The coroutine consists of a vu_dispatch() loop
+ * where libvhost-user calls vu_message_read() to receive the next vhost-user
+ * protocol messages over the UNIX domain socket.
+ *
+ * When virtqueues are set up libvhost-user calls set_watch() to monitor kick
+ * fds. These fds are also handled in the VuServer->ctx AioContext.
+ *
+ * Both vu_client_trip() and kick fd monitoring can be stopped by shutting down
+ * the socket connection. Shutting down the socket connection causes
+ * vu_message_read() to fail since no more data can be received from the socket.
+ * After vu_dispatch() fails, vu_client_trip() calls vu_deinit() to stop
+ * libvhost-user before terminating the coroutine. vu_deinit() calls
+ * remove_watch() to stop monitoring kick fds and this stops virtqueue
+ * processing.
+ *
+ * When vu_client_trip() has finished cleaning up it schedules a BH in the main
+ * loop thread to accept the next client connection.
+ *
+ * When libvhost-user detects an error it calls panic_cb() and sets the
+ * dev->broken flag. Both vu_client_trip() and kick fd processing stop when
+ * the dev->broken flag is set.
+ *
+ * It is possible to switch AioContexts using
+ * vhost_user_server_detach_aio_context() and
+ * vhost_user_server_attach_aio_context(). They stop monitoring fds in the old
+ * AioContext and resume monitoring in the new AioContext. The vu_client_trip()
+ * coroutine remains in a yielded state during the switch. This is made
+ * possible by QIOChannel's support for spurious coroutine re-entry in
+ * qio_channel_yield(). The coroutine will restart I/O when re-entered from the
+ * new AioContext.
+ */
+
 static void vmsg_close_fds(VhostUserMsg *vmsg)
 {
     int i;
@@ -27,68 +69,9 @@  static void vmsg_unblock_fds(VhostUserMsg *vmsg)
     }
 }
 
-static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
-                      gpointer opaque);
-
-static void close_client(VuServer *server)
-{
-    /*
-     * Before closing the client
-     *
-     * 1. Let vu_client_trip stop processing new vhost-user msg
-     *
-     * 2. remove kick_handler
-     *
-     * 3. wait for the kick handler to be finished
-     *
-     * 4. wait for the current vhost-user msg to be finished processing
-     */
-
-    QIOChannelSocket *sioc = server->sioc;
-    /* When this is set vu_client_trip will stop new processing vhost-user message */
-    server->sioc = NULL;
-
-    while (server->processing_msg) {
-        if (server->ioc->read_coroutine) {
-            server->ioc->read_coroutine = NULL;
-            qio_channel_set_aio_fd_handler(server->ioc, server->ioc->ctx, NULL,
-                                           NULL, server->ioc);
-            server->processing_msg = false;
-        }
-    }
-
-    vu_deinit(&server->vu_dev);
-
-    /* vu_deinit() should have called remove_watch() */
-    assert(QTAILQ_EMPTY(&server->vu_fd_watches));
-
-    object_unref(OBJECT(sioc));
-    object_unref(OBJECT(server->ioc));
-}
-
 static void panic_cb(VuDev *vu_dev, const char *buf)
 {
-    VuServer *server = container_of(vu_dev, VuServer, vu_dev);
-
-    /* avoid while loop in close_client */
-    server->processing_msg = false;
-
-    if (buf) {
-        error_report("vu_panic: %s", buf);
-    }
-
-    if (server->sioc) {
-        close_client(server);
-    }
-
-    /*
-     * Set the callback function for network listener so another
-     * vhost-user client can connect to this server
-     */
-    qio_net_listener_set_client_func(server->listener,
-                                     vu_accept,
-                                     server,
-                                     NULL);
+    error_report("vu_panic: %s", buf);
 }
 
 static bool coroutine_fn
@@ -185,28 +168,31 @@  fail:
     return false;
 }
 
-
-static void vu_client_start(VuServer *server);
 static coroutine_fn void vu_client_trip(void *opaque)
 {
     VuServer *server = opaque;
+    VuDev *vu_dev = &server->vu_dev;
 
-    while (!server->aio_context_changed && server->sioc) {
-        server->processing_msg = true;
-        vu_dispatch(&server->vu_dev);
-        server->processing_msg = false;
+    while (!vu_dev->broken && vu_dispatch(vu_dev)) {
+        /* Keep running */
     }
 
-    if (server->aio_context_changed && server->sioc) {
-        server->aio_context_changed = false;
-        vu_client_start(server);
-    }
-}
+    vu_deinit(vu_dev);
+
+    /* vu_deinit() should have called remove_watch() */
+    assert(QTAILQ_EMPTY(&server->vu_fd_watches));
+
+    object_unref(OBJECT(server->sioc));
+    server->sioc = NULL;
 
-static void vu_client_start(VuServer *server)
-{
-    server->co_trip = qemu_coroutine_create(vu_client_trip, server);
-    aio_co_enter(server->ctx, server->co_trip);
+    object_unref(OBJECT(server->ioc));
+    server->ioc = NULL;
+
+    server->co_trip = NULL;
+    if (server->restart_listener_bh) {
+        qemu_bh_schedule(server->restart_listener_bh);
+    }
+    aio_wait_kick();
 }
 
 /*
@@ -219,12 +205,18 @@  static void vu_client_start(VuServer *server)
 static void kick_handler(void *opaque)
 {
     VuFdWatch *vu_fd_watch = opaque;
-    vu_fd_watch->processing = true;
-    vu_fd_watch->cb(vu_fd_watch->vu_dev, 0, vu_fd_watch->pvt);
-    vu_fd_watch->processing = false;
+    VuDev *vu_dev = vu_fd_watch->vu_dev;
+
+    vu_fd_watch->cb(vu_dev, 0, vu_fd_watch->pvt);
+
+    /* Stop vu_client_trip() if an error occurred in vu_fd_watch->cb() */
+    if (vu_dev->broken) {
+        VuServer *server = container_of(vu_dev, VuServer, vu_dev);
+
+        qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+    }
 }
 
-
 static VuFdWatch *find_vu_fd_watch(VuServer *server, int fd)
 {
 
@@ -319,62 +311,95 @@  static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
     qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client");
     server->ioc = QIO_CHANNEL(sioc);
     object_ref(OBJECT(server->ioc));
-    qio_channel_attach_aio_context(server->ioc, server->ctx);
+
+    /* TODO vu_message_write() spins if non-blocking! */
     qio_channel_set_blocking(server->ioc, false, NULL);
-    vu_client_start(server);
+
+    server->co_trip = qemu_coroutine_create(vu_client_trip, server);
+
+    aio_context_acquire(server->ctx);
+    vhost_user_server_attach_aio_context(server, server->ctx);
+    aio_context_release(server->ctx);
 }
 
-
 void vhost_user_server_stop(VuServer *server)
 {
+    aio_context_acquire(server->ctx);
+
+    qemu_bh_delete(server->restart_listener_bh);
+    server->restart_listener_bh = NULL;
+
     if (server->sioc) {
-        close_client(server);
+        VuFdWatch *vu_fd_watch;
+
+        QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
+            aio_set_fd_handler(server->ctx, vu_fd_watch->fd, true,
+                               NULL, NULL, NULL, vu_fd_watch);
+        }
+
+        qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+
+        AIO_WAIT_WHILE(server->ctx, server->co_trip);
     }
 
+    aio_context_release(server->ctx);
+
     if (server->listener) {
         qio_net_listener_disconnect(server->listener);
         object_unref(OBJECT(server->listener));
     }
+}
+
+/*
+ * Allow the next client to connect to the server. Called from a BH in the main
+ * loop.
+ */
+static void restart_listener_bh(void *opaque)
+{
+    VuServer *server = opaque;
 
+    qio_net_listener_set_client_func(server->listener, vu_accept, server,
+                                     NULL);
 }
 
-void vhost_user_server_set_aio_context(VuServer *server, AioContext *ctx)
+/* Called with ctx acquired */
+void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx)
 {
-    VuFdWatch *vu_fd_watch, *next;
-    void *opaque = NULL;
-    IOHandler *io_read = NULL;
-    bool attach;
+    VuFdWatch *vu_fd_watch;
 
-    server->ctx = ctx ? ctx : qemu_get_aio_context();
+    server->ctx = ctx;
 
     if (!server->sioc) {
-        /* not yet serving any client*/
         return;
     }
 
-    if (ctx) {
-        qio_channel_attach_aio_context(server->ioc, ctx);
-        server->aio_context_changed = true;
-        io_read = kick_handler;
-        attach = true;
-    } else {
+    qio_channel_attach_aio_context(server->ioc, ctx);
+
+    QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
+        aio_set_fd_handler(ctx, vu_fd_watch->fd, true, kick_handler, NULL,
+                           NULL, vu_fd_watch);
+    }
+
+    aio_co_schedule(ctx, server->co_trip);
+}
+
+/* Called with server->ctx acquired */
+void vhost_user_server_detach_aio_context(VuServer *server)
+{
+    if (server->sioc) {
+        VuFdWatch *vu_fd_watch;
+
+        QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
+            aio_set_fd_handler(server->ctx, vu_fd_watch->fd, true,
+                               NULL, NULL, NULL, vu_fd_watch);
+        }
+
         qio_channel_detach_aio_context(server->ioc);
-        /* server->ioc->ctx keeps the old AioConext */
-        ctx = server->ioc->ctx;
-        attach = false;
     }
 
-    QTAILQ_FOREACH_SAFE(vu_fd_watch, &server->vu_fd_watches, next, next) {
-        if (vu_fd_watch->cb) {
-            opaque = attach ? vu_fd_watch : NULL;
-            aio_set_fd_handler(ctx, vu_fd_watch->fd, true,
-                               io_read, NULL, NULL,
-                               opaque);
-        }
-    }
+    server->ctx = NULL;
 }
 
-
 bool vhost_user_server_start(VuServer *server,
                              SocketAddress *socket_addr,
                              AioContext *ctx,
@@ -382,6 +407,7 @@  bool vhost_user_server_start(VuServer *server,
                              const VuDevIface *vu_iface,
                              Error **errp)
 {
+    QEMUBH *bh;
     QIONetListener *listener = qio_net_listener_new();
     if (qio_net_listener_open_sync(listener, socket_addr, 1,
                                    errp) < 0) {
@@ -389,9 +415,12 @@  bool vhost_user_server_start(VuServer *server,
         return false;
     }
 
+    bh = qemu_bh_new(restart_listener_bh, server);
+
     /* zero out unspecified fields */
     *server = (VuServer) {
         .listener              = listener,
+        .restart_listener_bh   = bh,
         .vu_iface              = vu_iface,
         .max_queues            = max_queues,
         .ctx                   = ctx,