[RFC,v9,11/19] virtio/vsock: dequeue callback for SOCK_SEQPACKET

Message ID 20210508163523.3431999-1-arseny.krasnov@kaspersky.com
State Superseded
Headers show
Series
  • virtio/vsock: introduce SOCK_SEQPACKET support
Related show

Commit Message

Arseny Krasnov May 8, 2021, 4:35 p.m.
This adds transport callback and it's logic for SEQPACKET dequeue.
Callback fetches RW packets from rx queue of socket until whole record
is copied(if user's buffer is full, user is not woken up). This is done
to not stall sender, because if we wake up user and it leaves syscall,
nobody will send credit update for rest of record, and sender will wait
for next enter of read syscall at receiver's side. So if user buffer is
full, we just send credit update and drop data.

Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
---
 v8 -> v9:
 1) Check for RW packet type is removed from loop(all packet now
    considered RW).
 2) Locking in loop is fixed.
 3) cpu_to_le32()/le32_to_cpu() now used.
 4) MSG_TRUNC handling removed from transport.

 include/linux/virtio_vsock.h            |  5 ++
 net/vmw_vsock/virtio_transport_common.c | 64 +++++++++++++++++++++++++
 2 files changed, 69 insertions(+)

Comments

Stefano Garzarella May 13, 2021, 11:58 a.m. | #1
On Sat, May 08, 2021 at 07:35:20PM +0300, Arseny Krasnov wrote:
>This adds transport callback and it's logic for SEQPACKET dequeue.

>Callback fetches RW packets from rx queue of socket until whole record

>is copied(if user's buffer is full, user is not woken up). This is done

>to not stall sender, because if we wake up user and it leaves syscall,

>nobody will send credit update for rest of record, and sender will wait

>for next enter of read syscall at receiver's side. So if user buffer is

>full, we just send credit update and drop data.

>

>Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>

>---

> v8 -> v9:

> 1) Check for RW packet type is removed from loop(all packet now

>    considered RW).

> 2) Locking in loop is fixed.

> 3) cpu_to_le32()/le32_to_cpu() now used.

> 4) MSG_TRUNC handling removed from transport.

>

> include/linux/virtio_vsock.h            |  5 ++

> net/vmw_vsock/virtio_transport_common.c | 64 +++++++++++++++++++++++++

> 2 files changed, 69 insertions(+)

>

>diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h

>index dc636b727179..02acf6e9ae04 100644

>--- a/include/linux/virtio_vsock.h

>+++ b/include/linux/virtio_vsock.h

>@@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,

> 			       struct msghdr *msg,

> 			       size_t len, int flags);

>

>+ssize_t

>+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,

>+				   struct msghdr *msg,

>+				   int flags,

>+				   bool *msg_ready);

> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);

> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);

>

>diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c

>index ad0d34d41444..f649a21dd23b 100644

>--- a/net/vmw_vsock/virtio_transport_common.c

>+++ b/net/vmw_vsock/virtio_transport_common.c

>@@ -393,6 +393,58 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,

> 	return err;

> }

>

>+static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,

>+						 struct msghdr *msg,

>+						 int flags,

>+						 bool *msg_ready)

>+{

>+	struct virtio_vsock_sock *vvs = vsk->trans;

>+	struct virtio_vsock_pkt *pkt;

>+	int err = 0;

>+	size_t user_buf_len = msg->msg_iter.count;

>+

>+	*msg_ready = false;

>+	spin_lock_bh(&vvs->rx_lock);

>+

>+	while (!*msg_ready && !list_empty(&vvs->rx_queue) && err >= 0) {

>+		size_t bytes_to_copy;

>+		size_t pkt_len;

>+

>+		pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);

>+		pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);

>+		bytes_to_copy = min(user_buf_len, pkt_len);

>+

>+		if (bytes_to_copy) {

>+			/* sk_lock is held by caller so no one else can dequeue.

>+			 * Unlock rx_lock since memcpy_to_msg() may sleep.

>+			 */

>+			spin_unlock_bh(&vvs->rx_lock);

>+

>+			if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) 

>{

>+				err = -EINVAL;

>+			} else {

>+				err += pkt_len;


If `bytes_to_copy == 0` we are not increasing the real length.

Anyway is a bit confusing increase a variable called `err`, I think is 
better to have another variable to store this information that we return 
if there aren't errors.
Stefano Garzarella May 13, 2021, 12:18 p.m. | #2
On Sat, May 08, 2021 at 07:35:20PM +0300, Arseny Krasnov wrote:
>This adds transport callback and it's logic for SEQPACKET dequeue.

>Callback fetches RW packets from rx queue of socket until whole record

>is copied(if user's buffer is full, user is not woken up). This is done

>to not stall sender, because if we wake up user and it leaves syscall,

>nobody will send credit update for rest of record, and sender will wait

>for next enter of read syscall at receiver's side. So if user buffer is

>full, we just send credit update and drop data.

>

>Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>

>---

> v8 -> v9:

> 1) Check for RW packet type is removed from loop(all packet now

>    considered RW).

> 2) Locking in loop is fixed.

> 3) cpu_to_le32()/le32_to_cpu() now used.

> 4) MSG_TRUNC handling removed from transport.

>

> include/linux/virtio_vsock.h            |  5 ++

> net/vmw_vsock/virtio_transport_common.c | 64 +++++++++++++++++++++++++

> 2 files changed, 69 insertions(+)

>

>diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h

>index dc636b727179..02acf6e9ae04 100644

>--- a/include/linux/virtio_vsock.h

>+++ b/include/linux/virtio_vsock.h

>@@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,

> 			       struct msghdr *msg,

> 			       size_t len, int flags);

>

>+ssize_t

>+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,

>+				   struct msghdr *msg,

>+				   int flags,

>+				   bool *msg_ready);

> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);

> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);

>

>diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c

>index ad0d34d41444..f649a21dd23b 100644

>--- a/net/vmw_vsock/virtio_transport_common.c

>+++ b/net/vmw_vsock/virtio_transport_common.c

>@@ -393,6 +393,58 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,

> 	return err;

> }

>

>+static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,

>+						 struct msghdr *msg,

>+						 int flags,

>+						 bool *msg_ready)

>+{

>+	struct virtio_vsock_sock *vvs = vsk->trans;

>+	struct virtio_vsock_pkt *pkt;

>+	int err = 0;

>+	size_t user_buf_len = msg->msg_iter.count;


Forgot to mention that also here is better to use `msg_data_left(msg)`

Thanks,
Stefano
Arseny Krasnov May 13, 2021, 2:42 p.m. | #3
On 13.05.2021 14:58, Stefano Garzarella wrote:
> On Sat, May 08, 2021 at 07:35:20PM +0300, Arseny Krasnov wrote:

>> This adds transport callback and it's logic for SEQPACKET dequeue.

>> Callback fetches RW packets from rx queue of socket until whole record

>> is copied(if user's buffer is full, user is not woken up). This is done

>> to not stall sender, because if we wake up user and it leaves syscall,

>> nobody will send credit update for rest of record, and sender will wait

>> for next enter of read syscall at receiver's side. So if user buffer is

>> full, we just send credit update and drop data.

>>

>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>

>> ---

>> v8 -> v9:

>> 1) Check for RW packet type is removed from loop(all packet now

>>    considered RW).

>> 2) Locking in loop is fixed.

>> 3) cpu_to_le32()/le32_to_cpu() now used.

>> 4) MSG_TRUNC handling removed from transport.

>>

>> include/linux/virtio_vsock.h            |  5 ++

>> net/vmw_vsock/virtio_transport_common.c | 64 +++++++++++++++++++++++++

>> 2 files changed, 69 insertions(+)

>>

>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h

>> index dc636b727179..02acf6e9ae04 100644

>> --- a/include/linux/virtio_vsock.h

>> +++ b/include/linux/virtio_vsock.h

>> @@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,

>> 			       struct msghdr *msg,

>> 			       size_t len, int flags);

>>

>> +ssize_t

>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,

>> +				   struct msghdr *msg,

>> +				   int flags,

>> +				   bool *msg_ready);

>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);

>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);

>>

>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c

>> index ad0d34d41444..f649a21dd23b 100644

>> --- a/net/vmw_vsock/virtio_transport_common.c

>> +++ b/net/vmw_vsock/virtio_transport_common.c

>> @@ -393,6 +393,58 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,

>> 	return err;

>> }

>>

>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,

>> +						 struct msghdr *msg,

>> +						 int flags,

>> +						 bool *msg_ready)

>> +{

>> +	struct virtio_vsock_sock *vvs = vsk->trans;

>> +	struct virtio_vsock_pkt *pkt;

>> +	int err = 0;

>> +	size_t user_buf_len = msg->msg_iter.count;

>> +

>> +	*msg_ready = false;

>> +	spin_lock_bh(&vvs->rx_lock);

>> +

>> +	while (!*msg_ready && !list_empty(&vvs->rx_queue) && err >= 0) {

>> +		size_t bytes_to_copy;

>> +		size_t pkt_len;

>> +

>> +		pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);

>> +		pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);

>> +		bytes_to_copy = min(user_buf_len, pkt_len);

>> +

>> +		if (bytes_to_copy) {

>> +			/* sk_lock is held by caller so no one else can dequeue.

>> +			 * Unlock rx_lock since memcpy_to_msg() may sleep.

>> +			 */

>> +			spin_unlock_bh(&vvs->rx_lock);

>> +

>> +			if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) 

>> {

>> +				err = -EINVAL;

>> +			} else {

>> +				err += pkt_len;

> If `bytes_to_copy == 0` we are not increasing the real length.

>

> Anyway is a bit confusing increase a variable called `err`, I think is 

> better to have another variable to store this information that we return 

> if there aren't errors.

Ack
>

>

Patch

diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
index dc636b727179..02acf6e9ae04 100644
--- a/include/linux/virtio_vsock.h
+++ b/include/linux/virtio_vsock.h
@@ -80,6 +80,11 @@  virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
 			       struct msghdr *msg,
 			       size_t len, int flags);
 
+ssize_t
+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
+				   struct msghdr *msg,
+				   int flags,
+				   bool *msg_ready);
 s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
 s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
 
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index ad0d34d41444..f649a21dd23b 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -393,6 +393,58 @@  virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
 	return err;
 }
 
+static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
+						 struct msghdr *msg,
+						 int flags,
+						 bool *msg_ready)
+{
+	struct virtio_vsock_sock *vvs = vsk->trans;
+	struct virtio_vsock_pkt *pkt;
+	int err = 0;
+	size_t user_buf_len = msg->msg_iter.count;
+
+	*msg_ready = false;
+	spin_lock_bh(&vvs->rx_lock);
+
+	while (!*msg_ready && !list_empty(&vvs->rx_queue) && err >= 0) {
+		size_t bytes_to_copy;
+		size_t pkt_len;
+
+		pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
+		pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
+		bytes_to_copy = min(user_buf_len, pkt_len);
+
+		if (bytes_to_copy) {
+			/* sk_lock is held by caller so no one else can dequeue.
+			 * Unlock rx_lock since memcpy_to_msg() may sleep.
+			 */
+			spin_unlock_bh(&vvs->rx_lock);
+
+			if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) {
+				err = -EINVAL;
+			} else {
+				err += pkt_len;
+				user_buf_len -= bytes_to_copy;
+			}
+
+			spin_lock_bh(&vvs->rx_lock);
+		}
+
+		if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR)
+			*msg_ready = true;
+
+		virtio_transport_dec_rx_pkt(vvs, pkt);
+		list_del(&pkt->list);
+		virtio_transport_free_pkt(pkt);
+	}
+
+	spin_unlock_bh(&vvs->rx_lock);
+
+	virtio_transport_send_credit_update(vsk);
+
+	return err;
+}
+
 ssize_t
 virtio_transport_stream_dequeue(struct vsock_sock *vsk,
 				struct msghdr *msg,
@@ -405,6 +457,18 @@  virtio_transport_stream_dequeue(struct vsock_sock *vsk,
 }
 EXPORT_SYMBOL_GPL(virtio_transport_stream_dequeue);
 
+ssize_t
+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
+				   struct msghdr *msg,
+				   int flags, bool *msg_ready)
+{
+	if (flags & MSG_PEEK)
+		return -EOPNOTSUPP;
+
+	return virtio_transport_seqpacket_do_dequeue(vsk, msg, flags, msg_ready);
+}
+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_dequeue);
+
 int
 virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
 			       struct msghdr *msg,