diff mbox series

[2/3] libceph: add sparse read support to OSD client

Message ID 20220309123323.20593-3-jlayton@kernel.org
State New
Headers show
Series [1/3] libceph: add sparse read support to msgr2 crc state machine | expand

Commit Message

Jeff Layton March 9, 2022, 12:33 p.m. UTC
Add a new sparse_read operation for the OSD client, driven by its own
state machine. The messenger can repeatedly call the sparse_read
operation, and it will pass back the necessary info to set up to read
the next extent of data, while zeroing in the sparse regions.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
---
 include/linux/ceph/osd_client.h |  38 ++++++++
 net/ceph/osd_client.c           | 163 ++++++++++++++++++++++++++++++--
 2 files changed, 194 insertions(+), 7 deletions(-)

Comments

Jeff Layton March 11, 2022, 5:08 p.m. UTC | #1
On Wed, 2022-03-09 at 07:33 -0500, Jeff Layton wrote:
> Add a new sparse_read operation for the OSD client, driven by its own
> state machine. The messenger can repeatedly call the sparse_read
> operation, and it will pass back the necessary info to set up to read
> the next extent of data, while zeroing in the sparse regions.
> 
> Signed-off-by: Jeff Layton <jlayton@kernel.org>
> ---
>  include/linux/ceph/osd_client.h |  38 ++++++++
>  net/ceph/osd_client.c           | 163 ++++++++++++++++++++++++++++++--
>  2 files changed, 194 insertions(+), 7 deletions(-)
> 
> diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
> index 3431011f364d..42eb1628a66d 100644
> --- a/include/linux/ceph/osd_client.h
> +++ b/include/linux/ceph/osd_client.h
> @@ -29,6 +29,43 @@ typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *);
>  
>  #define CEPH_HOMELESS_OSD	-1
>  
> +enum ceph_sparse_read_state {
> +	CEPH_SPARSE_READ_HDR	= 0,
> +	CEPH_SPARSE_READ_EXTENTS,
> +	CEPH_SPARSE_READ_DATA_LEN,
> +	CEPH_SPARSE_READ_DATA,
> +};
> +
> +/* A single extent in a SPARSE_READ reply */
> +struct ceph_sparse_extent {
> +	__le64	off;
> +	__le64	len;
> +} __attribute__((packed));
> +
> +/*
> + * A SPARSE_READ reply is a 32-bit count of extents, followed by an array of
> + * 64-bit offset/length pairs, and then all of the actual file data
> + * concatenated after it (sans holes).
> + *
> + * Unfortunately, we don't know how long the extent array is until we've
> + * started reading the data section of the reply, so for a real sparse read, we
> + * have to allocate the array after alloc_msg returns.
> + *
> + * For the common case of a single extent, we keep an embedded extent here so
> + * we can avoid the extra allocation.
> + */
> +struct ceph_sparse_read {
> +	enum ceph_sparse_read_state	sr_state;	/* state machine state */
> +	u64				sr_req_off;	/* orig request offset */
> +	u64				sr_req_len;	/* orig request length */
> +	u64				sr_pos;		/* current pos in buffer */
> +	int				sr_index;	/* current extent index */
> +	__le32				sr_datalen;	/* length of actual data */
> +	__le32				sr_count;	/* extent count */
> +	struct ceph_sparse_extent	*sr_extent;	/* extent array */
> +	struct ceph_sparse_extent	sr_emb_ext[1];	/* embedded extent */
> +};
> +
>  /* a given osd we're communicating with */
>  struct ceph_osd {
>  	refcount_t o_ref;
> @@ -46,6 +83,7 @@ struct ceph_osd {
>  	unsigned long lru_ttl;
>  	struct list_head o_keepalive_item;
>  	struct mutex lock;
> +	struct ceph_sparse_read	o_sparse_read;
>  };
>  
>  #define CEPH_OSD_SLAB_OPS	2
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index 1c5815530e0d..f519b5727ee3 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -376,6 +376,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
>  
>  	switch (op->op) {
>  	case CEPH_OSD_OP_READ:
> +	case CEPH_OSD_OP_SPARSE_READ:
>  	case CEPH_OSD_OP_WRITE:
>  	case CEPH_OSD_OP_WRITEFULL:
>  		ceph_osd_data_release(&op->extent.osd_data);
> @@ -706,6 +707,7 @@ static void get_num_data_items(struct ceph_osd_request *req,
>  		/* reply */
>  		case CEPH_OSD_OP_STAT:
>  		case CEPH_OSD_OP_READ:
> +		case CEPH_OSD_OP_SPARSE_READ:
>  		case CEPH_OSD_OP_LIST_WATCHERS:
>  			*num_reply_data_items += 1;
>  			break;
> @@ -775,7 +777,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
>  
>  	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
>  	       opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
> -	       opcode != CEPH_OSD_OP_TRUNCATE);
> +	       opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_SPARSE_READ);
>  
>  	op->extent.offset = offset;
>  	op->extent.length = length;
> @@ -984,6 +986,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
>  	case CEPH_OSD_OP_STAT:
>  		break;
>  	case CEPH_OSD_OP_READ:
> +	case CEPH_OSD_OP_SPARSE_READ:
>  	case CEPH_OSD_OP_WRITE:
>  	case CEPH_OSD_OP_WRITEFULL:
>  	case CEPH_OSD_OP_ZERO:
> @@ -1080,7 +1083,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
>  
>  	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
>  	       opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
> -	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
> +	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE &&
> +	       opcode != CEPH_OSD_OP_SPARSE_READ);
>  
>  	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
>  					GFP_NOFS);
> @@ -2037,6 +2041,7 @@ static void setup_request_data(struct ceph_osd_request *req)
>  					       &op->raw_data_in);
>  			break;
>  		case CEPH_OSD_OP_READ:
> +		case CEPH_OSD_OP_SPARSE_READ:
>  			ceph_osdc_msg_data_add(reply_msg,
>  					       &op->extent.osd_data);
>  			break;
> @@ -2443,6 +2448,21 @@ static void submit_request(struct ceph_osd_request *req, bool wrlocked)
>  	__submit_request(req, wrlocked);
>  }
>  
> +static void ceph_init_sparse_read(struct ceph_sparse_read *sr, struct ceph_osd_req_op *op)
> +{
> +	if (sr->sr_extent != sr->sr_emb_ext)
> +		kfree(sr->sr_extent);
> +	sr->sr_state = CEPH_SPARSE_READ_HDR;
> +	sr->sr_req_off = op ? op->extent.offset : 0;
> +	sr->sr_req_len = op ? op->extent.length : 0;
> +	sr->sr_pos = sr->sr_req_off;
> +	sr->sr_index = 0;
> +	sr->sr_count = 0;
> +	sr->sr_extent = sr->sr_emb_ext;
> +	sr->sr_extent[0].off = 0;
> +	sr->sr_extent[0].len = 0;
> +}
> +

I think that this patch also needs make osd_cleanup() call
ceph_init_sparse_read as well, to ensure that we kfree the sr_extent (if
there was one and the previous call didn't complete). Fixed in my
tree...

>  static void finish_request(struct ceph_osd_request *req)
>  {
>  	struct ceph_osd_client *osdc = req->r_osdc;
> @@ -2452,8 +2472,10 @@ static void finish_request(struct ceph_osd_request *req)
>  
>  	req->r_end_latency = ktime_get();
>  
> -	if (req->r_osd)
> +	if (req->r_osd) {
> +		ceph_init_sparse_read(&req->r_osd->o_sparse_read, NULL);
>  		unlink_request(req->r_osd, req);
> +	}
>  	atomic_dec(&osdc->num_requests);
>  
>  	/*
> @@ -3655,6 +3677,8 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
>  	struct MOSDOpReply m;
>  	u64 tid = le64_to_cpu(msg->hdr.tid);
>  	u32 data_len = 0;
> +	u32 result_len = 0;
> +	bool sparse = false;
>  	int ret;
>  	int i;
>  
> @@ -3749,21 +3773,32 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
>  		req->r_ops[i].rval = m.rval[i];
>  		req->r_ops[i].outdata_len = m.outdata_len[i];
>  		data_len += m.outdata_len[i];
> +		if (req->r_ops[i].op == CEPH_OSD_OP_SPARSE_READ)
> +			sparse = true;
>  	}
> +
> +	result_len = data_len;
> +	if (sparse) {
> +		struct ceph_sparse_read *sr = &osd->o_sparse_read;
> +
> +		/* Fudge the result if this was a sparse read. */
> +		result_len = sr->sr_pos - sr->sr_req_off;
> +	}
> +
>  	if (data_len != le32_to_cpu(msg->hdr.data_len)) {
>  		pr_err("sum of lens %u != %u for tid %llu\n", data_len,
>  		       le32_to_cpu(msg->hdr.data_len), req->r_tid);
>  		goto fail_request;
>  	}
> -	dout("%s req %p tid %llu result %d data_len %u\n", __func__,
> -	     req, req->r_tid, m.result, data_len);
> +	dout("%s req %p tid %llu result %d data_len %u result_len %u\n", __func__,
> +	     req, req->r_tid, m.result, data_len, result_len);
>  
>  	/*
>  	 * Since we only ever request ONDISK, we should only ever get
>  	 * one (type of) reply back.
>  	 */
>  	WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK));
> -	req->r_result = m.result ?: data_len;
> +	req->r_result = m.result ?: result_len;
>  	finish_request(req);
>  	mutex_unlock(&osd->lock);
>  	up_read(&osdc->lock);
> @@ -5398,6 +5433,21 @@ static void osd_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
>  	ceph_msg_put(msg);
>  }
>  
> +static struct ceph_osd_req_op *
> +sparse_read_op(struct ceph_osd_request *req)
> +{
> +	int i;
> +
> +	if (!(req->r_flags & CEPH_OSD_FLAG_READ))
> +		return NULL;
> +
> +	for (i = 0; i < req->r_num_ops; ++i) {
> +		if (req->r_ops[i].op == CEPH_OSD_OP_SPARSE_READ)
> +			return &req->r_ops[i];
> +	}
> +	return NULL;
> +}
> +
>  /*
>   * Lookup and return message for incoming reply.  Don't try to do
>   * anything about a larger than preallocated data portion of the
> @@ -5414,6 +5464,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
>  	int front_len = le32_to_cpu(hdr->front_len);
>  	int data_len = le32_to_cpu(hdr->data_len);
>  	u64 tid = le64_to_cpu(hdr->tid);
> +	struct ceph_osd_req_op *srop;
>  
>  	down_read(&osdc->lock);
>  	if (!osd_registered(osd)) {
> @@ -5446,7 +5497,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
>  		req->r_reply = m;
>  	}
>  
> -	if (data_len > req->r_reply->data_length) {
> +	srop = sparse_read_op(req);
> +
> +	if (!srop && (data_len > req->r_reply->data_length)) {
>  		pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
>  			__func__, osd->o_osd, req->r_tid, data_len,
>  			req->r_reply->data_length);
> @@ -5456,6 +5509,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
>  	}
>  
>  	m = ceph_msg_get(req->r_reply);
> +	m->sparse_read = srop;
> +	if (srop)
> +		ceph_init_sparse_read(&osd->o_sparse_read, srop);
> +
>  	dout("get_reply tid %lld %p\n", tid, m);
>  
>  out_unlock_session:
> @@ -5688,9 +5745,101 @@ static int osd_check_message_signature(struct ceph_msg *msg)
>  	return ceph_auth_check_message_signature(auth, msg);
>  }
>  
> +static void zero_len(struct ceph_msg_data_cursor *cursor, size_t len)
> +{
> +	while (len) {
> +		struct page *page;
> +		size_t poff, plen;
> +		bool last = false;
> +
> +		page = ceph_msg_data_next(cursor, &poff, &plen, &last);
> +		if (plen > len)
> +			plen = len;
> +		zero_user_segment(page, poff, poff + plen);
> +		len -= plen;
> +		ceph_msg_data_advance(cursor, plen);
> +	}
> +}
> +
> +static int osd_sparse_read(struct ceph_connection *con,
> +			   struct ceph_msg_data_cursor *cursor,
> +			   u64 *plen, char **pbuf)
> +{
> +	struct ceph_osd *o = con->private;
> +	struct ceph_sparse_read *sr = &o->o_sparse_read;
> +	u32 count = __le32_to_cpu(sr->sr_count);
> +	u64 eoff, elen;
> +
> +	switch (sr->sr_state) {
> +	case CEPH_SPARSE_READ_HDR:
> +		dout("[%d] request to read 0x%llx~0x%llx\n", o->o_osd, sr->sr_req_off, sr->sr_req_len);
> +		/* number of extents */
> +		*plen = sizeof(sr->sr_count);
> +		*pbuf = (char *)&sr->sr_count;
> +		sr->sr_state = CEPH_SPARSE_READ_EXTENTS;
> +		break;
> +	case CEPH_SPARSE_READ_EXTENTS:
> +		dout("[%d] got %u extents\n", o->o_osd, count);
> +
> +		if (count > 0) {
> +			if (count > 1) {
> +				/* can't use the embedded extent array */
> +				sr->sr_extent = kmalloc_array(count, sizeof(*sr->sr_extent),
> +							   GFP_NOIO);
> +				if (!sr->sr_extent)
> +					return -ENOMEM;
> +			}
> +			*plen = count * sizeof(*sr->sr_extent);
> +			*pbuf = (char *)sr->sr_extent;
> +			sr->sr_state = CEPH_SPARSE_READ_DATA_LEN;
> +			break;
> +		}
> +		/* No extents? Fall through to reading data len */
> +		fallthrough;
> +	case CEPH_SPARSE_READ_DATA_LEN:
> +		*plen = sizeof(sr->sr_datalen);
> +		*pbuf = (char *)&sr->sr_datalen;
> +		sr->sr_state = CEPH_SPARSE_READ_DATA;
> +		break;
> +	case CEPH_SPARSE_READ_DATA:
> +		if (sr->sr_index >= count)
> +			return 0;
> +		if (sr->sr_index == 0) {
> +			/* last extent */
> +			eoff = le64_to_cpu(sr->sr_extent[count - 1].off);
> +			elen = le64_to_cpu(sr->sr_extent[count - 1].len);
> +
> +			/* set up cursor to end of last extent */
> +			ceph_msg_data_cursor_init(cursor, con->in_msg,
> +						  eoff + elen - sr->sr_req_off);
> +		}
> +
> +		eoff = le64_to_cpu(sr->sr_extent[sr->sr_index].off);
> +		elen = le64_to_cpu(sr->sr_extent[sr->sr_index].len);
> +
> +		dout("[%d] ext %d off 0x%llx len 0x%llx\n", o->o_osd, sr->sr_index, eoff, elen);
> +
> +		/* zero out anything from sr_pos to start of extent */
> +		if (sr->sr_pos < eoff)
> +			zero_len(cursor, eoff - sr->sr_pos);
> +
> +		/* Set position to end of extent */
> +		sr->sr_pos = eoff + elen;
> +
> +		/* send back the new length */
> +		*plen = elen;
> +
> +		/* Bump the array index */
> +		++sr->sr_index;
> +		break;
> +	}
> +	return 1;
> +}
> +
>  static const struct ceph_connection_operations osd_con_ops = {
>  	.get = osd_get_con,
>  	.put = osd_put_con,
> +	.sparse_read = osd_sparse_read,
>  	.alloc_msg = osd_alloc_msg,
>  	.dispatch = osd_dispatch,
>  	.fault = osd_fault,
diff mbox series

Patch

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 3431011f364d..42eb1628a66d 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -29,6 +29,43 @@  typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *);
 
 #define CEPH_HOMELESS_OSD	-1
 
+enum ceph_sparse_read_state {
+	CEPH_SPARSE_READ_HDR	= 0,
+	CEPH_SPARSE_READ_EXTENTS,
+	CEPH_SPARSE_READ_DATA_LEN,
+	CEPH_SPARSE_READ_DATA,
+};
+
+/* A single extent in a SPARSE_READ reply */
+struct ceph_sparse_extent {
+	__le64	off;
+	__le64	len;
+} __attribute__((packed));
+
+/*
+ * A SPARSE_READ reply is a 32-bit count of extents, followed by an array of
+ * 64-bit offset/length pairs, and then all of the actual file data
+ * concatenated after it (sans holes).
+ *
+ * Unfortunately, we don't know how long the extent array is until we've
+ * started reading the data section of the reply, so for a real sparse read, we
+ * have to allocate the array after alloc_msg returns.
+ *
+ * For the common case of a single extent, we keep an embedded extent here so
+ * we can avoid the extra allocation.
+ */
+struct ceph_sparse_read {
+	enum ceph_sparse_read_state	sr_state;	/* state machine state */
+	u64				sr_req_off;	/* orig request offset */
+	u64				sr_req_len;	/* orig request length */
+	u64				sr_pos;		/* current pos in buffer */
+	int				sr_index;	/* current extent index */
+	__le32				sr_datalen;	/* length of actual data */
+	__le32				sr_count;	/* extent count */
+	struct ceph_sparse_extent	*sr_extent;	/* extent array */
+	struct ceph_sparse_extent	sr_emb_ext[1];	/* embedded extent */
+};
+
 /* a given osd we're communicating with */
 struct ceph_osd {
 	refcount_t o_ref;
@@ -46,6 +83,7 @@  struct ceph_osd {
 	unsigned long lru_ttl;
 	struct list_head o_keepalive_item;
 	struct mutex lock;
+	struct ceph_sparse_read	o_sparse_read;
 };
 
 #define CEPH_OSD_SLAB_OPS	2
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 1c5815530e0d..f519b5727ee3 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -376,6 +376,7 @@  static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
 
 	switch (op->op) {
 	case CEPH_OSD_OP_READ:
+	case CEPH_OSD_OP_SPARSE_READ:
 	case CEPH_OSD_OP_WRITE:
 	case CEPH_OSD_OP_WRITEFULL:
 		ceph_osd_data_release(&op->extent.osd_data);
@@ -706,6 +707,7 @@  static void get_num_data_items(struct ceph_osd_request *req,
 		/* reply */
 		case CEPH_OSD_OP_STAT:
 		case CEPH_OSD_OP_READ:
+		case CEPH_OSD_OP_SPARSE_READ:
 		case CEPH_OSD_OP_LIST_WATCHERS:
 			*num_reply_data_items += 1;
 			break;
@@ -775,7 +777,7 @@  void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
 
 	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
 	       opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
-	       opcode != CEPH_OSD_OP_TRUNCATE);
+	       opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_SPARSE_READ);
 
 	op->extent.offset = offset;
 	op->extent.length = length;
@@ -984,6 +986,7 @@  static u32 osd_req_encode_op(struct ceph_osd_op *dst,
 	case CEPH_OSD_OP_STAT:
 		break;
 	case CEPH_OSD_OP_READ:
+	case CEPH_OSD_OP_SPARSE_READ:
 	case CEPH_OSD_OP_WRITE:
 	case CEPH_OSD_OP_WRITEFULL:
 	case CEPH_OSD_OP_ZERO:
@@ -1080,7 +1083,8 @@  struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 
 	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
 	       opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
-	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
+	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE &&
+	       opcode != CEPH_OSD_OP_SPARSE_READ);
 
 	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
 					GFP_NOFS);
@@ -2037,6 +2041,7 @@  static void setup_request_data(struct ceph_osd_request *req)
 					       &op->raw_data_in);
 			break;
 		case CEPH_OSD_OP_READ:
+		case CEPH_OSD_OP_SPARSE_READ:
 			ceph_osdc_msg_data_add(reply_msg,
 					       &op->extent.osd_data);
 			break;
@@ -2443,6 +2448,21 @@  static void submit_request(struct ceph_osd_request *req, bool wrlocked)
 	__submit_request(req, wrlocked);
 }
 
+static void ceph_init_sparse_read(struct ceph_sparse_read *sr, struct ceph_osd_req_op *op)
+{
+	if (sr->sr_extent != sr->sr_emb_ext)
+		kfree(sr->sr_extent);
+	sr->sr_state = CEPH_SPARSE_READ_HDR;
+	sr->sr_req_off = op ? op->extent.offset : 0;
+	sr->sr_req_len = op ? op->extent.length : 0;
+	sr->sr_pos = sr->sr_req_off;
+	sr->sr_index = 0;
+	sr->sr_count = 0;
+	sr->sr_extent = sr->sr_emb_ext;
+	sr->sr_extent[0].off = 0;
+	sr->sr_extent[0].len = 0;
+}
+
 static void finish_request(struct ceph_osd_request *req)
 {
 	struct ceph_osd_client *osdc = req->r_osdc;
@@ -2452,8 +2472,10 @@  static void finish_request(struct ceph_osd_request *req)
 
 	req->r_end_latency = ktime_get();
 
-	if (req->r_osd)
+	if (req->r_osd) {
+		ceph_init_sparse_read(&req->r_osd->o_sparse_read, NULL);
 		unlink_request(req->r_osd, req);
+	}
 	atomic_dec(&osdc->num_requests);
 
 	/*
@@ -3655,6 +3677,8 @@  static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
 	struct MOSDOpReply m;
 	u64 tid = le64_to_cpu(msg->hdr.tid);
 	u32 data_len = 0;
+	u32 result_len = 0;
+	bool sparse = false;
 	int ret;
 	int i;
 
@@ -3749,21 +3773,32 @@  static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
 		req->r_ops[i].rval = m.rval[i];
 		req->r_ops[i].outdata_len = m.outdata_len[i];
 		data_len += m.outdata_len[i];
+		if (req->r_ops[i].op == CEPH_OSD_OP_SPARSE_READ)
+			sparse = true;
 	}
+
+	result_len = data_len;
+	if (sparse) {
+		struct ceph_sparse_read *sr = &osd->o_sparse_read;
+
+		/* Fudge the result if this was a sparse read. */
+		result_len = sr->sr_pos - sr->sr_req_off;
+	}
+
 	if (data_len != le32_to_cpu(msg->hdr.data_len)) {
 		pr_err("sum of lens %u != %u for tid %llu\n", data_len,
 		       le32_to_cpu(msg->hdr.data_len), req->r_tid);
 		goto fail_request;
 	}
-	dout("%s req %p tid %llu result %d data_len %u\n", __func__,
-	     req, req->r_tid, m.result, data_len);
+	dout("%s req %p tid %llu result %d data_len %u result_len %u\n", __func__,
+	     req, req->r_tid, m.result, data_len, result_len);
 
 	/*
 	 * Since we only ever request ONDISK, we should only ever get
 	 * one (type of) reply back.
 	 */
 	WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK));
-	req->r_result = m.result ?: data_len;
+	req->r_result = m.result ?: result_len;
 	finish_request(req);
 	mutex_unlock(&osd->lock);
 	up_read(&osdc->lock);
@@ -5398,6 +5433,21 @@  static void osd_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
 	ceph_msg_put(msg);
 }
 
+static struct ceph_osd_req_op *
+sparse_read_op(struct ceph_osd_request *req)
+{
+	int i;
+
+	if (!(req->r_flags & CEPH_OSD_FLAG_READ))
+		return NULL;
+
+	for (i = 0; i < req->r_num_ops; ++i) {
+		if (req->r_ops[i].op == CEPH_OSD_OP_SPARSE_READ)
+			return &req->r_ops[i];
+	}
+	return NULL;
+}
+
 /*
  * Lookup and return message for incoming reply.  Don't try to do
  * anything about a larger than preallocated data portion of the
@@ -5414,6 +5464,7 @@  static struct ceph_msg *get_reply(struct ceph_connection *con,
 	int front_len = le32_to_cpu(hdr->front_len);
 	int data_len = le32_to_cpu(hdr->data_len);
 	u64 tid = le64_to_cpu(hdr->tid);
+	struct ceph_osd_req_op *srop;
 
 	down_read(&osdc->lock);
 	if (!osd_registered(osd)) {
@@ -5446,7 +5497,9 @@  static struct ceph_msg *get_reply(struct ceph_connection *con,
 		req->r_reply = m;
 	}
 
-	if (data_len > req->r_reply->data_length) {
+	srop = sparse_read_op(req);
+
+	if (!srop && (data_len > req->r_reply->data_length)) {
 		pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
 			__func__, osd->o_osd, req->r_tid, data_len,
 			req->r_reply->data_length);
@@ -5456,6 +5509,10 @@  static struct ceph_msg *get_reply(struct ceph_connection *con,
 	}
 
 	m = ceph_msg_get(req->r_reply);
+	m->sparse_read = srop;
+	if (srop)
+		ceph_init_sparse_read(&osd->o_sparse_read, srop);
+
 	dout("get_reply tid %lld %p\n", tid, m);
 
 out_unlock_session:
@@ -5688,9 +5745,101 @@  static int osd_check_message_signature(struct ceph_msg *msg)
 	return ceph_auth_check_message_signature(auth, msg);
 }
 
+static void zero_len(struct ceph_msg_data_cursor *cursor, size_t len)
+{
+	while (len) {
+		struct page *page;
+		size_t poff, plen;
+		bool last = false;
+
+		page = ceph_msg_data_next(cursor, &poff, &plen, &last);
+		if (plen > len)
+			plen = len;
+		zero_user_segment(page, poff, poff + plen);
+		len -= plen;
+		ceph_msg_data_advance(cursor, plen);
+	}
+}
+
+static int osd_sparse_read(struct ceph_connection *con,
+			   struct ceph_msg_data_cursor *cursor,
+			   u64 *plen, char **pbuf)
+{
+	struct ceph_osd *o = con->private;
+	struct ceph_sparse_read *sr = &o->o_sparse_read;
+	u32 count = __le32_to_cpu(sr->sr_count);
+	u64 eoff, elen;
+
+	switch (sr->sr_state) {
+	case CEPH_SPARSE_READ_HDR:
+		dout("[%d] request to read 0x%llx~0x%llx\n", o->o_osd, sr->sr_req_off, sr->sr_req_len);
+		/* number of extents */
+		*plen = sizeof(sr->sr_count);
+		*pbuf = (char *)&sr->sr_count;
+		sr->sr_state = CEPH_SPARSE_READ_EXTENTS;
+		break;
+	case CEPH_SPARSE_READ_EXTENTS:
+		dout("[%d] got %u extents\n", o->o_osd, count);
+
+		if (count > 0) {
+			if (count > 1) {
+				/* can't use the embedded extent array */
+				sr->sr_extent = kmalloc_array(count, sizeof(*sr->sr_extent),
+							   GFP_NOIO);
+				if (!sr->sr_extent)
+					return -ENOMEM;
+			}
+			*plen = count * sizeof(*sr->sr_extent);
+			*pbuf = (char *)sr->sr_extent;
+			sr->sr_state = CEPH_SPARSE_READ_DATA_LEN;
+			break;
+		}
+		/* No extents? Fall through to reading data len */
+		fallthrough;
+	case CEPH_SPARSE_READ_DATA_LEN:
+		*plen = sizeof(sr->sr_datalen);
+		*pbuf = (char *)&sr->sr_datalen;
+		sr->sr_state = CEPH_SPARSE_READ_DATA;
+		break;
+	case CEPH_SPARSE_READ_DATA:
+		if (sr->sr_index >= count)
+			return 0;
+		if (sr->sr_index == 0) {
+			/* last extent */
+			eoff = le64_to_cpu(sr->sr_extent[count - 1].off);
+			elen = le64_to_cpu(sr->sr_extent[count - 1].len);
+
+			/* set up cursor to end of last extent */
+			ceph_msg_data_cursor_init(cursor, con->in_msg,
+						  eoff + elen - sr->sr_req_off);
+		}
+
+		eoff = le64_to_cpu(sr->sr_extent[sr->sr_index].off);
+		elen = le64_to_cpu(sr->sr_extent[sr->sr_index].len);
+
+		dout("[%d] ext %d off 0x%llx len 0x%llx\n", o->o_osd, sr->sr_index, eoff, elen);
+
+		/* zero out anything from sr_pos to start of extent */
+		if (sr->sr_pos < eoff)
+			zero_len(cursor, eoff - sr->sr_pos);
+
+		/* Set position to end of extent */
+		sr->sr_pos = eoff + elen;
+
+		/* send back the new length */
+		*plen = elen;
+
+		/* Bump the array index */
+		++sr->sr_index;
+		break;
+	}
+	return 1;
+}
+
 static const struct ceph_connection_operations osd_con_ops = {
 	.get = osd_get_con,
 	.put = osd_put_con,
+	.sparse_read = osd_sparse_read,
 	.alloc_msg = osd_alloc_msg,
 	.dispatch = osd_dispatch,
 	.fault = osd_fault,