diff mbox series

[v3,03/18] migration/rdma: create multifd_setup_ops for Tx/Rx thread

Message ID 1602908748-43335-4-git-send-email-zhengchuan@huawei.com
State New
Headers show
Series Support Multifd for RDMA migration | expand

Commit Message

Zheng Chuan Oct. 17, 2020, 4:25 a.m. UTC
Create multifd_setup_ops for TxRx thread, no logic change.

Signed-off-by: Chuan Zheng <zhengchuan@huawei.com>
---
 migration/multifd.c | 44 +++++++++++++++++++++++++++++++++++++++-----
 migration/multifd.h |  7 +++++++
 2 files changed, 46 insertions(+), 5 deletions(-)

Comments

Dr. David Alan Gilbert Nov. 10, 2020, 12:11 p.m. UTC | #1
* Chuan Zheng (zhengchuan@huawei.com) wrote:
> Create multifd_setup_ops for TxRx thread, no logic change.

> 

> Signed-off-by: Chuan Zheng <zhengchuan@huawei.com>

> ---

>  migration/multifd.c | 44 +++++++++++++++++++++++++++++++++++++++-----

>  migration/multifd.h |  7 +++++++

>  2 files changed, 46 insertions(+), 5 deletions(-)

> 

> diff --git a/migration/multifd.c b/migration/multifd.c

> index 68b171f..1f82307 100644

> --- a/migration/multifd.c

> +++ b/migration/multifd.c

> @@ -383,6 +383,8 @@ struct {

>      int exiting;

>      /* multifd ops */

>      MultiFDMethods *ops;

> +    /* multifd setup ops */

> +    MultiFDSetup *setup_ops;

>  } *multifd_send_state;

>  

>  /*

> @@ -790,8 +792,9 @@ static bool multifd_channel_connect(MultiFDSendParams *p,

>          } else {

>              /* update for tls qio channel */

>              p->c = ioc;

> -            qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,

> -                                   QEMU_THREAD_JOINABLE);

> +            qemu_thread_create(&p->thread, p->name,

> +                               multifd_send_state->setup_ops->send_thread_setup,

> +                               p, QEMU_THREAD_JOINABLE);

>         }

>         return false;

>      }

> @@ -839,6 +842,11 @@ cleanup:

>      multifd_new_send_channel_cleanup(p, sioc, local_err);

>  }

>  

> +static void multifd_send_channel_setup(MultiFDSendParams *p)

> +{

> +    socket_send_channel_create(multifd_new_send_channel_async, p);

> +}

> +

>  int multifd_save_setup(Error **errp)

>  {

>      int thread_count;

> @@ -856,6 +864,7 @@ int multifd_save_setup(Error **errp)

>      multifd_send_state->pages = multifd_pages_init(page_count);

>      qemu_sem_init(&multifd_send_state->channels_ready, 0);

>      qatomic_set(&multifd_send_state->exiting, 0);

> +    multifd_send_state->setup_ops = multifd_setup_ops_init();

>      multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];

>  

>      for (i = 0; i < thread_count; i++) {

> @@ -875,7 +884,7 @@ int multifd_save_setup(Error **errp)

>          p->packet->version = cpu_to_be32(MULTIFD_VERSION);

>          p->name = g_strdup_printf("multifdsend_%d", i);

>          p->tls_hostname = g_strdup(s->hostname);

> -        socket_send_channel_create(multifd_new_send_channel_async, p);

> +        multifd_send_state->setup_ops->send_channel_setup(p);

>      }

>  

>      for (i = 0; i < thread_count; i++) {

> @@ -902,6 +911,8 @@ struct {

>      uint64_t packet_num;

>      /* multifd ops */

>      MultiFDMethods *ops;

> +    /* multifd setup ops */

> +    MultiFDSetup *setup_ops;

>  } *multifd_recv_state;

>  

>  static void multifd_recv_terminate_threads(Error *err)

> @@ -1095,6 +1106,7 @@ int multifd_load_setup(Error **errp)

>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);

>      qatomic_set(&multifd_recv_state->count, 0);

>      qemu_sem_init(&multifd_recv_state->sem_sync, 0);

> +    multifd_recv_state->setup_ops = multifd_setup_ops_init();

>      multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];

>  

>      for (i = 0; i < thread_count; i++) {

> @@ -1173,9 +1185,31 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)

>      p->num_packets = 1;

>  

>      p->running = true;

> -    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,

> -                       QEMU_THREAD_JOINABLE);

> +    multifd_recv_state->setup_ops->recv_channel_setup(ioc, p);

> +    qemu_thread_create(&p->thread, p->name,

> +                       multifd_recv_state->setup_ops->recv_thread_setup,

> +                       p, QEMU_THREAD_JOINABLE);

>      qatomic_inc(&multifd_recv_state->count);

>      return qatomic_read(&multifd_recv_state->count) ==

>             migrate_multifd_channels();

>  }

> +

> +static void multifd_recv_channel_setup(QIOChannel *ioc, MultiFDRecvParams *p)

> +{

> +    return;

> +}

> +

> +static MultiFDSetup multifd_socket_ops = {

> +    .send_thread_setup = multifd_send_thread,

> +    .recv_thread_setup = multifd_recv_thread,

> +    .send_channel_setup = multifd_send_channel_setup,

> +    .recv_channel_setup = multifd_recv_channel_setup

> +};


I don't think you need '_setup' on the thread function names here.

Dave

> +MultiFDSetup *multifd_setup_ops_init(void)

> +{

> +    MultiFDSetup *ops;

> +

> +    ops = &multifd_socket_ops;

> +    return ops;

> +}

> diff --git a/migration/multifd.h b/migration/multifd.h

> index 8d6751f..446315b 100644

> --- a/migration/multifd.h

> +++ b/migration/multifd.h

> @@ -166,6 +166,13 @@ typedef struct {

>      int (*recv_pages)(MultiFDRecvParams *p, uint32_t used, Error **errp);

>  } MultiFDMethods;

>  

> +typedef struct {

> +    void *(*send_thread_setup)(void *opaque);

> +    void *(*recv_thread_setup)(void *opaque);

> +    void (*send_channel_setup)(MultiFDSendParams *p);

> +    void (*recv_channel_setup)(QIOChannel *ioc, MultiFDRecvParams *p);

> +} MultiFDSetup;

> +

>  void multifd_register_ops(int method, MultiFDMethods *ops);

>  

>  #endif

> -- 

> 1.8.3.1

> 

-- 
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Zheng Chuan Nov. 11, 2020, 7:51 a.m. UTC | #2
On 2020/11/10 20:11, Dr. David Alan Gilbert wrote:
> * Chuan Zheng (zhengchuan@huawei.com) wrote:

>> Create multifd_setup_ops for TxRx thread, no logic change.

>>

>> Signed-off-by: Chuan Zheng <zhengchuan@huawei.com>

>> ---

>>  migration/multifd.c | 44 +++++++++++++++++++++++++++++++++++++++-----

>>  migration/multifd.h |  7 +++++++

>>  2 files changed, 46 insertions(+), 5 deletions(-)

>>

>> diff --git a/migration/multifd.c b/migration/multifd.c

>> index 68b171f..1f82307 100644

>> --- a/migration/multifd.c

>> +++ b/migration/multifd.c

>> @@ -383,6 +383,8 @@ struct {

>>      int exiting;

>>      /* multifd ops */

>>      MultiFDMethods *ops;

>> +    /* multifd setup ops */

>> +    MultiFDSetup *setup_ops;

>>  } *multifd_send_state;

>>  

>>  /*

>> @@ -790,8 +792,9 @@ static bool multifd_channel_connect(MultiFDSendParams *p,

>>          } else {

>>              /* update for tls qio channel */

>>              p->c = ioc;

>> -            qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,

>> -                                   QEMU_THREAD_JOINABLE);

>> +            qemu_thread_create(&p->thread, p->name,

>> +                               multifd_send_state->setup_ops->send_thread_setup,

>> +                               p, QEMU_THREAD_JOINABLE);

>>         }

>>         return false;

>>      }

>> @@ -839,6 +842,11 @@ cleanup:

>>      multifd_new_send_channel_cleanup(p, sioc, local_err);

>>  }

>>  

>> +static void multifd_send_channel_setup(MultiFDSendParams *p)

>> +{

>> +    socket_send_channel_create(multifd_new_send_channel_async, p);

>> +}

>> +

>>  int multifd_save_setup(Error **errp)

>>  {

>>      int thread_count;

>> @@ -856,6 +864,7 @@ int multifd_save_setup(Error **errp)

>>      multifd_send_state->pages = multifd_pages_init(page_count);

>>      qemu_sem_init(&multifd_send_state->channels_ready, 0);

>>      qatomic_set(&multifd_send_state->exiting, 0);

>> +    multifd_send_state->setup_ops = multifd_setup_ops_init();

>>      multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];

>>  

>>      for (i = 0; i < thread_count; i++) {

>> @@ -875,7 +884,7 @@ int multifd_save_setup(Error **errp)

>>          p->packet->version = cpu_to_be32(MULTIFD_VERSION);

>>          p->name = g_strdup_printf("multifdsend_%d", i);

>>          p->tls_hostname = g_strdup(s->hostname);

>> -        socket_send_channel_create(multifd_new_send_channel_async, p);

>> +        multifd_send_state->setup_ops->send_channel_setup(p);

>>      }

>>  

>>      for (i = 0; i < thread_count; i++) {

>> @@ -902,6 +911,8 @@ struct {

>>      uint64_t packet_num;

>>      /* multifd ops */

>>      MultiFDMethods *ops;

>> +    /* multifd setup ops */

>> +    MultiFDSetup *setup_ops;

>>  } *multifd_recv_state;

>>  

>>  static void multifd_recv_terminate_threads(Error *err)

>> @@ -1095,6 +1106,7 @@ int multifd_load_setup(Error **errp)

>>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);

>>      qatomic_set(&multifd_recv_state->count, 0);

>>      qemu_sem_init(&multifd_recv_state->sem_sync, 0);

>> +    multifd_recv_state->setup_ops = multifd_setup_ops_init();

>>      multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];

>>  

>>      for (i = 0; i < thread_count; i++) {

>> @@ -1173,9 +1185,31 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)

>>      p->num_packets = 1;

>>  

>>      p->running = true;

>> -    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,

>> -                       QEMU_THREAD_JOINABLE);

>> +    multifd_recv_state->setup_ops->recv_channel_setup(ioc, p);

>> +    qemu_thread_create(&p->thread, p->name,

>> +                       multifd_recv_state->setup_ops->recv_thread_setup,

>> +                       p, QEMU_THREAD_JOINABLE);

>>      qatomic_inc(&multifd_recv_state->count);

>>      return qatomic_read(&multifd_recv_state->count) ==

>>             migrate_multifd_channels();

>>  }

>> +

>> +static void multifd_recv_channel_setup(QIOChannel *ioc, MultiFDRecvParams *p)

>> +{

>> +    return;

>> +}

>> +

>> +static MultiFDSetup multifd_socket_ops = {

>> +    .send_thread_setup = multifd_send_thread,

>> +    .recv_thread_setup = multifd_recv_thread,

>> +    .send_channel_setup = multifd_send_channel_setup,

>> +    .recv_channel_setup = multifd_recv_channel_setup

>> +};

> 

> I don't think you need '_setup' on the thread function names here.

> 

> Dave

> 

OK, done in my local tree.
>> +MultiFDSetup *multifd_setup_ops_init(void)

>> +{

>> +    MultiFDSetup *ops;

>> +

>> +    ops = &multifd_socket_ops;

>> +    return ops;

>> +}

>> diff --git a/migration/multifd.h b/migration/multifd.h

>> index 8d6751f..446315b 100644

>> --- a/migration/multifd.h

>> +++ b/migration/multifd.h

>> @@ -166,6 +166,13 @@ typedef struct {

>>      int (*recv_pages)(MultiFDRecvParams *p, uint32_t used, Error **errp);

>>  } MultiFDMethods;

>>  

>> +typedef struct {

>> +    void *(*send_thread_setup)(void *opaque);

>> +    void *(*recv_thread_setup)(void *opaque);

>> +    void (*send_channel_setup)(MultiFDSendParams *p);

>> +    void (*recv_channel_setup)(QIOChannel *ioc, MultiFDRecvParams *p);

>> +} MultiFDSetup;

>> +

>>  void multifd_register_ops(int method, MultiFDMethods *ops);

>>  

>>  #endif

>> -- 

>> 1.8.3.1

>>


-- 
Regards.
Chuan
diff mbox series

Patch

diff --git a/migration/multifd.c b/migration/multifd.c
index 68b171f..1f82307 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -383,6 +383,8 @@  struct {
     int exiting;
     /* multifd ops */
     MultiFDMethods *ops;
+    /* multifd setup ops */
+    MultiFDSetup *setup_ops;
 } *multifd_send_state;
 
 /*
@@ -790,8 +792,9 @@  static bool multifd_channel_connect(MultiFDSendParams *p,
         } else {
             /* update for tls qio channel */
             p->c = ioc;
-            qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
-                                   QEMU_THREAD_JOINABLE);
+            qemu_thread_create(&p->thread, p->name,
+                               multifd_send_state->setup_ops->send_thread_setup,
+                               p, QEMU_THREAD_JOINABLE);
        }
        return false;
     }
@@ -839,6 +842,11 @@  cleanup:
     multifd_new_send_channel_cleanup(p, sioc, local_err);
 }
 
+static void multifd_send_channel_setup(MultiFDSendParams *p)
+{
+    socket_send_channel_create(multifd_new_send_channel_async, p);
+}
+
 int multifd_save_setup(Error **errp)
 {
     int thread_count;
@@ -856,6 +864,7 @@  int multifd_save_setup(Error **errp)
     multifd_send_state->pages = multifd_pages_init(page_count);
     qemu_sem_init(&multifd_send_state->channels_ready, 0);
     qatomic_set(&multifd_send_state->exiting, 0);
+    multifd_send_state->setup_ops = multifd_setup_ops_init();
     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
 
     for (i = 0; i < thread_count; i++) {
@@ -875,7 +884,7 @@  int multifd_save_setup(Error **errp)
         p->packet->version = cpu_to_be32(MULTIFD_VERSION);
         p->name = g_strdup_printf("multifdsend_%d", i);
         p->tls_hostname = g_strdup(s->hostname);
-        socket_send_channel_create(multifd_new_send_channel_async, p);
+        multifd_send_state->setup_ops->send_channel_setup(p);
     }
 
     for (i = 0; i < thread_count; i++) {
@@ -902,6 +911,8 @@  struct {
     uint64_t packet_num;
     /* multifd ops */
     MultiFDMethods *ops;
+    /* multifd setup ops */
+    MultiFDSetup *setup_ops;
 } *multifd_recv_state;
 
 static void multifd_recv_terminate_threads(Error *err)
@@ -1095,6 +1106,7 @@  int multifd_load_setup(Error **errp)
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     qatomic_set(&multifd_recv_state->count, 0);
     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
+    multifd_recv_state->setup_ops = multifd_setup_ops_init();
     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
 
     for (i = 0; i < thread_count; i++) {
@@ -1173,9 +1185,31 @@  bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
     p->num_packets = 1;
 
     p->running = true;
-    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
-                       QEMU_THREAD_JOINABLE);
+    multifd_recv_state->setup_ops->recv_channel_setup(ioc, p);
+    qemu_thread_create(&p->thread, p->name,
+                       multifd_recv_state->setup_ops->recv_thread_setup,
+                       p, QEMU_THREAD_JOINABLE);
     qatomic_inc(&multifd_recv_state->count);
     return qatomic_read(&multifd_recv_state->count) ==
            migrate_multifd_channels();
 }
+
+static void multifd_recv_channel_setup(QIOChannel *ioc, MultiFDRecvParams *p)
+{
+    return;
+}
+
+static MultiFDSetup multifd_socket_ops = {
+    .send_thread_setup = multifd_send_thread,
+    .recv_thread_setup = multifd_recv_thread,
+    .send_channel_setup = multifd_send_channel_setup,
+    .recv_channel_setup = multifd_recv_channel_setup
+};
+
+MultiFDSetup *multifd_setup_ops_init(void)
+{
+    MultiFDSetup *ops;
+
+    ops = &multifd_socket_ops;
+    return ops;
+}
diff --git a/migration/multifd.h b/migration/multifd.h
index 8d6751f..446315b 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -166,6 +166,13 @@  typedef struct {
     int (*recv_pages)(MultiFDRecvParams *p, uint32_t used, Error **errp);
 } MultiFDMethods;
 
+typedef struct {
+    void *(*send_thread_setup)(void *opaque);
+    void *(*recv_thread_setup)(void *opaque);
+    void (*send_channel_setup)(MultiFDSendParams *p);
+    void (*recv_channel_setup)(QIOChannel *ioc, MultiFDRecvParams *p);
+} MultiFDSetup;
+
 void multifd_register_ops(int method, MultiFDMethods *ops);
 
 #endif