@@ -25,6 +25,22 @@ struct ceph_connection_operations {
struct ceph_connection *(*get)(struct ceph_connection *);
void (*put)(struct ceph_connection *);
+ /**
+ * sparse_read: read sparse data
+ * @con: connection we're reading from
+ * @off: offset into msgr data caller should read into
+ * @len: len of the data that msgr should read
+ * @buf: optional buffer to read into
+ *
+ * This should be called more than once, each time setting up to
+ * receive an extent into the correct portion of the buffer (and
+ * zeroing the holes between them).
+ *
+ * Returns 1 if there is more data to be read, 0 if reading is
+ * complete, or -errno if there was an error.
+ */
+ int (*sparse_read)(struct ceph_connection *con, u64 *off, u64 *len, char **buf);
+
/* handle an incoming message. */
void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m);
@@ -252,6 +268,7 @@ struct ceph_msg {
struct kref kref;
bool more_to_follow;
bool needs_out_seq;
+ bool sparse_read;
int front_alloc_len;
struct ceph_msgpool *pool;
@@ -464,6 +481,8 @@ struct ceph_connection {
struct page *bounce_page;
u32 in_front_crc, in_middle_crc, in_data_crc; /* calculated crc */
+ int sparse_resid;
+
struct timespec64 last_keepalive_ack; /* keepalive2 ack stamp */
struct delayed_work work; /* send|recv work */
@@ -52,14 +52,17 @@
#define FRAME_LATE_STATUS_COMPLETE 0xe
#define FRAME_LATE_STATUS_ABORTED_MASK 0xf
-#define IN_S_HANDLE_PREAMBLE 1
-#define IN_S_HANDLE_CONTROL 2
-#define IN_S_HANDLE_CONTROL_REMAINDER 3
-#define IN_S_PREPARE_READ_DATA 4
-#define IN_S_PREPARE_READ_DATA_CONT 5
-#define IN_S_PREPARE_READ_ENC_PAGE 6
-#define IN_S_HANDLE_EPILOGUE 7
-#define IN_S_FINISH_SKIP 8
+#define IN_S_HANDLE_PREAMBLE 1
+#define IN_S_HANDLE_CONTROL 2
+#define IN_S_HANDLE_CONTROL_REMAINDER 3
+#define IN_S_PREPARE_READ_DATA 4
+#define IN_S_PREPARE_READ_DATA_CONT 5
+#define IN_S_PREPARE_READ_ENC_PAGE 6
+#define IN_S_PREPARE_SPARSE_DATA 7
+#define IN_S_PREPARE_SPARSE_DATA_HDR 8
+#define IN_S_PREPARE_SPARSE_DATA_CONT 9
+#define IN_S_HANDLE_EPILOGUE 10
+#define IN_S_FINISH_SKIP 11
#define OUT_S_QUEUE_DATA 1
#define OUT_S_QUEUE_DATA_CONT 2
@@ -1753,13 +1756,13 @@ static int prepare_read_control_remainder(struct ceph_connection *con)
return 0;
}
-static int prepare_read_data(struct ceph_connection *con)
+static int prepare_read_data_extent(struct ceph_connection *con, int off, int len)
{
struct bio_vec bv;
- con->in_data_crc = -1;
- ceph_msg_data_cursor_init(&con->v2.in_cursor, con->in_msg,
- data_len(con->in_msg));
+ ceph_msg_data_cursor_init(&con->v2.in_cursor, con->in_msg, off+len);
+ if (off)
+ ceph_msg_data_advance(&con->v2.in_cursor, off);
get_bvec_at(&con->v2.in_cursor, &bv);
if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
@@ -1775,10 +1778,20 @@ static int prepare_read_data(struct ceph_connection *con)
bv.bv_offset = 0;
}
set_in_bvec(con, &bv);
- con->v2.in_state = IN_S_PREPARE_READ_DATA_CONT;
return 0;
}
+static int prepare_read_data(struct ceph_connection *con)
+{
+ int ret;
+
+ con->in_data_crc = -1;
+ ret = prepare_read_data_extent(con, 0, data_len(con->in_msg));
+ if (ret == 0)
+ con->v2.in_state = IN_S_PREPARE_READ_DATA_CONT;
+ return ret;
+}
+
static void prepare_read_data_cont(struct ceph_connection *con)
{
struct bio_vec bv;
@@ -1819,6 +1832,116 @@ static void prepare_read_data_cont(struct ceph_connection *con)
con->v2.in_state = IN_S_HANDLE_EPILOGUE;
}
+static int prepare_sparse_read_cont(struct ceph_connection *con)
+{
+ int ret;
+ struct bio_vec bv;
+ char *buf = NULL;
+ u64 off = 0, len = 0;
+
+ if (!iov_iter_is_bvec(&con->v2.in_iter))
+ return -EIO;
+
+ if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
+ con->in_data_crc = crc32c(con->in_data_crc,
+ page_address(con->bounce_page),
+ con->v2.in_bvec.bv_len);
+
+ get_bvec_at(&con->v2.in_cursor, &bv);
+ memcpy_to_page(bv.bv_page, bv.bv_offset,
+ page_address(con->bounce_page),
+ con->v2.in_bvec.bv_len);
+ } else {
+ con->in_data_crc = ceph_crc32c_page(con->in_data_crc,
+ con->v2.in_bvec.bv_page,
+ con->v2.in_bvec.bv_offset,
+ con->v2.in_bvec.bv_len);
+ }
+
+ ceph_msg_data_advance(&con->v2.in_cursor, con->v2.in_bvec.bv_len);
+ if (con->v2.in_cursor.total_resid) {
+ get_bvec_at(&con->v2.in_cursor, &bv);
+ if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
+ bv.bv_page = con->bounce_page;
+ bv.bv_offset = 0;
+ }
+ set_in_bvec(con, &bv);
+ WARN_ON(con->v2.in_state != IN_S_PREPARE_SPARSE_DATA_CONT);
+ return 0;
+ }
+
+ /* get next extent */
+ ret = con->ops->sparse_read(con, &off, &len, &buf);
+ if (ret <= 0) {
+ if (ret < 0)
+ return ret;
+
+ reset_in_kvecs(con);
+ add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN);
+ con->v2.in_state = IN_S_HANDLE_EPILOGUE;
+ return 0;
+ }
+
+ return prepare_read_data_extent(con, off, len);
+}
+
+static int prepare_sparse_read_header(struct ceph_connection *con)
+{
+ int ret;
+ char *buf = NULL;
+ u64 off = 0, len = 0;
+
+ if (!iov_iter_is_kvec(&con->v2.in_iter))
+ return -EIO;
+
+ /* On first call, we have no kvec so don't compute crc */
+ if (con->v2.in_kvec_cnt) {
+ WARN_ON_ONCE(con->v2.in_kvec_cnt > 1);
+ con->in_data_crc = crc32c(con->in_data_crc,
+ con->v2.in_kvecs[0].iov_base,
+ con->v2.in_kvecs[0].iov_len);
+ }
+
+ ret = con->ops->sparse_read(con, &off, &len, &buf);
+ if (ret < 0)
+ return ret;
+ if (ret == 0) {
+ reset_in_kvecs(con);
+ add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN);
+ con->v2.in_state = IN_S_HANDLE_EPILOGUE;
+ return 0;
+ }
+
+ /* No actual data? */
+ if (WARN_ON_ONCE(!ret))
+ return -EIO;
+
+ if (!buf) {
+ ret = prepare_read_data_extent(con, off, len);
+ if (!ret)
+ con->v2.in_state = IN_S_PREPARE_SPARSE_DATA_CONT;
+ return ret;
+ }
+
+ WARN_ON_ONCE(con->v2.in_state != IN_S_PREPARE_SPARSE_DATA_HDR);
+ reset_in_kvecs(con);
+ add_in_kvec(con, buf, len);
+ return 0;
+}
+
+static int prepare_sparse_read_data(struct ceph_connection *con)
+{
+ if (WARN_ON_ONCE(!con->ops->sparse_read))
+ return -EOPNOTSUPP;
+
+ if (!con_secure(con))
+ con->in_data_crc = -1;
+
+ reset_in_kvecs(con);
+ con->v2.in_state = IN_S_PREPARE_SPARSE_DATA_HDR;
+ return prepare_sparse_read_header(con);
+}
+
static int prepare_read_tail_plain(struct ceph_connection *con)
{
struct ceph_msg *msg = con->in_msg;
@@ -1839,7 +1962,10 @@ static int prepare_read_tail_plain(struct ceph_connection *con)
}
if (data_len(msg)) {
- con->v2.in_state = IN_S_PREPARE_READ_DATA;
+ if (msg->sparse_read)
+ con->v2.in_state = IN_S_PREPARE_SPARSE_DATA;
+ else
+ con->v2.in_state = IN_S_PREPARE_READ_DATA;
} else {
add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN);
con->v2.in_state = IN_S_HANDLE_EPILOGUE;
@@ -2893,6 +3019,15 @@ static int populate_in_iter(struct ceph_connection *con)
prepare_read_enc_page(con);
ret = 0;
break;
+ case IN_S_PREPARE_SPARSE_DATA:
+ ret = prepare_sparse_read_data(con);
+ break;
+ case IN_S_PREPARE_SPARSE_DATA_HDR:
+ ret = prepare_sparse_read_header(con);
+ break;
+ case IN_S_PREPARE_SPARSE_DATA_CONT:
+ ret = prepare_sparse_read_cont(con);
+ break;
case IN_S_HANDLE_EPILOGUE:
ret = handle_epilogue(con);
break;
@@ -3501,6 +3636,7 @@ static void revoke_at_handle_epilogue(struct ceph_connection *con)
void ceph_con_v2_revoke_incoming(struct ceph_connection *con)
{
switch (con->v2.in_state) {
+ case IN_S_PREPARE_SPARSE_DATA: // FIXME
case IN_S_PREPARE_READ_DATA:
revoke_at_prepare_read_data(con);
break;
Add support for a new sparse_read ceph_connection operation. The idea is that the client code can define this operation use it to do special handling for incoming reads. The alloc_msg routine can look at the request and determine whether the reply is expected to be sparse. If it is, then we'll dispatch to a different set of state machine states that will repeatedly call sparse_read get length and placement info for reading the extent map, and the extents themselves. TODO: support for revoke during a sparse read Signed-off-by: Jeff Layton <jlayton@kernel.org> --- include/linux/ceph/messenger.h | 19 ++++ net/ceph/messenger_v2.c | 164 ++++++++++++++++++++++++++++++--- 2 files changed, 169 insertions(+), 14 deletions(-)