@@ -336,6 +336,10 @@ struct ceph_connection_v1_info {
int in_base_pos; /* bytes read */
+ /* sparse reads */
+ struct kvec in_sr_kvec; /* current location to receive into */
+ u64 in_sr_len; /* amount of data in this extent */
+
/* message in temps */
u8 in_tag; /* protocol control byte */
struct ceph_msg_header in_hdr;
@@ -157,9 +157,9 @@ static size_t sizeof_footer(struct ceph_connection *con)
static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
{
- /* Initialize data cursor */
-
- ceph_msg_data_cursor_init(&msg->cursor, msg, data_len);
+ /* Initialize data cursor if it's not a sparse read */
+ if (!msg->sparse_read)
+ ceph_msg_data_cursor_init(&msg->cursor, msg, data_len);
}
/*
@@ -964,9 +964,9 @@ static void process_ack(struct ceph_connection *con)
prepare_read_tag(con);
}
-static int read_partial_message_section(struct ceph_connection *con,
- struct kvec *section,
- unsigned int sec_len, u32 *crc)
+static int read_partial_message_chunk(struct ceph_connection *con,
+ struct kvec *section,
+ unsigned int sec_len, u32 *crc)
{
int ret, left;
@@ -982,11 +982,91 @@ static int read_partial_message_section(struct ceph_connection *con,
section->iov_len += ret;
}
if (section->iov_len == sec_len)
- *crc = crc32c(0, section->iov_base, section->iov_len);
+ *crc = crc32c(*crc, section->iov_base, section->iov_len);
return 1;
}
+static inline int read_partial_message_section(struct ceph_connection *con,
+ struct kvec *section,
+ unsigned int sec_len, u32 *crc)
+{
+ *crc = 0;
+ return read_partial_message_chunk(con, section, sec_len, crc);
+}
+
+static int read_sparse_msg_extent(struct ceph_connection *con, u32 *crc)
+{
+ struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
+ bool do_bounce = ceph_test_opt(from_msgr(con->msgr), RXBOUNCE);
+
+ if (do_bounce && unlikely(!con->bounce_page)) {
+ con->bounce_page = alloc_page(GFP_NOIO);
+ if (!con->bounce_page) {
+ pr_err("failed to allocate bounce page\n");
+ return -ENOMEM;
+ }
+ }
+
+ while (cursor->sr_resid > 0) {
+ struct page *page, *rpage;
+ size_t off, len;
+ int ret;
+
+ page = ceph_msg_data_next(cursor, &off, &len);
+ rpage = do_bounce ? con->bounce_page : page;
+
+ /* clamp to what remains in extent */
+ len = min_t(int, len, cursor->sr_resid);
+ ret = ceph_tcp_recvpage(con->sock, rpage, (int)off, len);
+ if (ret <= 0)
+ return ret;
+ *crc = ceph_crc32c_page(*crc, rpage, off, ret);
+ ceph_msg_data_advance(cursor, (size_t)ret);
+ cursor->sr_resid -= ret;
+ if (do_bounce)
+ memcpy_page(page, off, rpage, off, ret);
+ }
+ return 1;
+}
+
+static int read_sparse_msg_data(struct ceph_connection *con)
+{
+ struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
+ bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
+ u32 crc = 0;
+ int ret = 1;
+
+ if (do_datacrc)
+ crc = con->in_data_crc;
+
+ do {
+ if (con->v1.in_sr_kvec.iov_base)
+ ret = read_partial_message_chunk(con,
+ &con->v1.in_sr_kvec,
+ con->v1.in_sr_len,
+ &crc);
+ else if (cursor->sr_resid > 0)
+ ret = read_sparse_msg_extent(con, &crc);
+
+ if (ret <= 0) {
+ if (do_datacrc)
+ con->in_data_crc = crc;
+ return ret;
+ }
+
+ memset(&con->v1.in_sr_kvec, 0, sizeof(con->v1.in_sr_kvec));
+ ret = con->ops->sparse_read(con, cursor,
+ (char **)&con->v1.in_sr_kvec.iov_base);
+ con->v1.in_sr_len = ret;
+ } while (ret > 0);
+
+ if (do_datacrc)
+ con->in_data_crc = crc;
+
+ return ret < 0 ? ret : 1; /* must return > 0 to indicate success */
+}
+
static int read_partial_msg_data(struct ceph_connection *con)
{
struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
@@ -1177,7 +1257,9 @@ static int read_partial_message(struct ceph_connection *con)
if (!m->num_data_items)
return -EIO;
- if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE))
+ if (m->sparse_read)
+ ret = read_sparse_msg_data(con);
+ else if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE))
ret = read_partial_msg_data_bounce(con);
else
ret = read_partial_msg_data(con);