diff mbox series

[RFC,6/9] block/curl: Cache downloaded blocks

Message ID 20200818110845.3825105-7-david.edmondson@oracle.com
State New
Headers show
Series block/curl: Add caching of data downloaded from the remote server | expand

Commit Message

David Edmondson Aug. 18, 2020, 11:08 a.m. UTC
In the hope that they will be referred to multiple times, cache the
blocks downloaded from the remote server.

Signed-off-by: David Edmondson <david.edmondson@oracle.com>
---
 block/curl.c       | 321 +++++++++++++++++++++++++++++++++++++++------
 block/trace-events |   3 +
 2 files changed, 287 insertions(+), 37 deletions(-)
diff mbox series

Patch

diff --git a/block/curl.c b/block/curl.c
index b2d02818a9..0ea9eedebd 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -81,11 +81,29 @@  static CURLMcode __curl_multi_socket_action(CURLM *multi_handle,
 /* Must be a non-zero power of 2. */
 #define CURL_BLOCK_OPT_BLOCKSIZE_DEFAULT (256 * 1024)
 
+/* The maximum number of blocks to store in the cache. */
+#define CURL_BLOCK_CACHE_MAX_BLOCKS 100
+/* The number of heads in the hash table. */
+#define CURL_BLOCK_CACHE_HASH 37
+
 struct BDRVCURLState;
 struct CURLState;
 
 static bool libcurl_initialized;
 
+typedef struct block {
+    QLIST_ENTRY(block) hash; /* Blocks with the same hash value. */
+    QLIST_ENTRY(block) free; /* Block free list. */
+    QTAILQ_ENTRY(block) lru; /* LRU list. */
+    bool hashed; /* block_t contains data and is hashed. */
+    int use;     /* Use count. */
+
+    uint64_t start; /* Offset of first byte. */
+    uint64_t count; /* Valid bytes. */
+
+    char *buf;      /* Data. */
+} block_t;
+
 typedef struct CURLAIOCB {
     Coroutine *co;
     QEMUIOVector *qiov;
@@ -117,12 +135,11 @@  typedef struct CURLState
     CURLAIOCB *acb[CURL_NUM_ACB];
     CURL *curl;
     QLIST_HEAD(, CURLSocket) sockets;
-    char *orig_buf;
-    uint64_t buf_start;
     size_t buf_off;
     char range[128];
     char errmsg[CURL_ERROR_SIZE];
     char in_use;
+    block_t *cache_block;
 } CURLState;
 
 typedef struct BDRVCURLState {
@@ -144,11 +161,17 @@  typedef struct BDRVCURLState {
     char *proxypassword;
     size_t offset;
     size_t blocksize;
+    int cache_allocated; /* The number of block_t currently allocated. */
+    QLIST_HEAD(, block) cache_free;
+    QTAILQ_HEAD(, block) cache_lru;
+    QLIST_HEAD(, block) * cache_hash;
 } BDRVCURLState;
 
 static void curl_clean_state(CURLState *s);
 static void curl_multi_do(void *arg);
 
+static void curl_cache_free(BDRVCURLState *s, block_t *b);
+
 /* Align "n" to the start of the containing block. */
 static inline uint64_t curl_block_align(BDRVCURLState *s, uint64_t n)
 {
@@ -161,6 +184,198 @@  static inline uint64_t curl_block_offset(BDRVCURLState *s, uint64_t n)
     return n & (s->blocksize - 1);
 }
 
+static uint64_t curl_cache_hash(BDRVCURLState *s, uint64_t n)
+{
+    return curl_block_align(s, n) % CURL_BLOCK_CACHE_HASH;
+}
+
+static bool curl_cache_init(BDRVCURLState *s)
+{
+    s->cache_allocated = 0;
+
+    QLIST_INIT(&s->cache_free);
+    QTAILQ_INIT(&s->cache_lru);
+
+    s->cache_hash = g_try_malloc(CURL_BLOCK_CACHE_HASH * sizeof(s->cache_hash));
+    if (!s->cache_hash) {
+        return false;
+    }
+
+    for (int i = 0; i < CURL_BLOCK_CACHE_HASH; i++) {
+        QLIST_INIT(&s->cache_hash[i]);
+    }
+
+    return true;
+}
+
+static void curl_cache_deinit(BDRVCURLState *s)
+{
+    block_t *b;
+
+    /*
+     * Cache blocks are either in the hash table or on the free list.
+     */
+    for (int i = 0; i < CURL_BLOCK_CACHE_HASH; i++) {
+        while (!QLIST_EMPTY(&s->cache_hash[i])) {
+            b = QLIST_FIRST(&s->cache_hash[i]);
+            QLIST_REMOVE(b, hash);
+            b->hashed = false;
+            curl_cache_free(s, b);
+        }
+    }
+
+    while (!QLIST_EMPTY(&s->cache_free)) {
+        b = QLIST_FIRST(&s->cache_free);
+        QLIST_REMOVE(b, free);
+        curl_cache_free(s, b);
+    }
+
+    assert(s->cache_allocated == 0);
+
+    g_free(s->cache_hash);
+    s->cache_hash = NULL;
+}
+
+static block_t *curl_cache_alloc(BDRVCURLState *s)
+{
+    block_t *b = g_try_malloc0(sizeof(*b));
+
+    if (!b) {
+        return NULL;
+    }
+
+    b->buf = g_try_malloc(s->blocksize);
+    if (!b->buf) {
+        g_free(b);
+        return NULL;
+    }
+
+    s->cache_allocated++;
+
+    trace_curl_cache_alloc(s->cache_allocated);
+
+    return b;
+}
+
+static void curl_cache_free(BDRVCURLState *s, block_t *b)
+{
+    assert(b->use == 0);
+    assert(!b->hashed);
+
+    g_free(b->buf);
+    g_free(b);
+
+    s->cache_allocated--;
+
+    trace_curl_cache_free(s->cache_allocated);
+}
+
+static block_t *curl_cache_get(BDRVCURLState *s)
+{
+    block_t *b = NULL;
+
+    /* If there is one on the free list, use it. */
+    if (!QLIST_EMPTY(&s->cache_free)) {
+        b = QLIST_FIRST(&s->cache_free);
+        QLIST_REMOVE(b, free);
+
+        assert(b->use == 0);
+        assert(!b->hashed);
+
+        b->use++;
+        goto done;
+    }
+
+    /* If not at the limit, try get a new one. */
+    if (s->cache_allocated < CURL_BLOCK_CACHE_MAX_BLOCKS) {
+        b = curl_cache_alloc(s);
+        if (b) {
+            b->use++;
+            goto done;
+        }
+    }
+
+    /* Take one from the LRU list. */
+    if (!QTAILQ_EMPTY(&s->cache_lru)) {
+        b = QTAILQ_FIRST(&s->cache_lru);
+        QTAILQ_REMOVE(&s->cache_lru, b, lru);
+
+        /* Remove it from the hash. */
+        QLIST_REMOVE(b, hash);
+
+        assert(b->use == 0);
+
+        b->hashed = false;
+        b->use++;
+        goto done;
+    }
+
+ done:
+    return b;
+}
+
+static void curl_cache_put(BDRVCURLState *s, block_t *b, bool valid)
+{
+    b->use--;
+
+    if (valid) {
+        /* If it's not hashed, hash it now. */
+        if (!b->hashed) {
+            b->hashed = true;
+            QLIST_INSERT_HEAD(&s->cache_hash[curl_cache_hash(s, b->start)],
+                              b, hash);
+        }
+
+        /* If the block is no longer being used, put it on the LRU list. */
+        if (b->use == 0) {
+            QTAILQ_INSERT_TAIL(&s->cache_lru, b, lru);
+        }
+    } else {
+        b->hashed = false;
+        QLIST_INSERT_HEAD(&s->cache_free, b, free);
+    }
+}
+
+static block_t *cache_lookup(BDRVCURLState *s, uint64_t start)
+{
+    block_t *b;
+
+    QLIST_FOREACH(b, &s->cache_hash[curl_cache_hash(s, start)], hash) {
+        if (b->start <= start && start < b->start + b->count) {
+            assert(b->hashed);
+            b->use++;
+
+            /* Remove from the LRU list. */
+            QTAILQ_REMOVE(&s->cache_lru, b, lru);
+
+            return b;
+        }
+    }
+
+    return NULL;
+}
+
+static bool curl_cache_find(BDRVCURLState *s, CURLAIOCB *acb)
+{
+    block_t *b;
+
+    b = cache_lookup(s, acb->offset);
+    if (!b) {
+        return false;
+    }
+
+    trace_curl_cache_hit(qemu_coroutine_self(), acb->offset, acb->bytes);
+
+    qemu_iovec_from_buf(acb->qiov, acb->qiov_offset,
+                        b->buf + curl_block_offset(s, acb->offset),
+                        acb->bytes);
+
+    curl_cache_put(s, b, true);
+
+    acb->ret = 0;
+    return true;
+}
+
 #ifdef NEED_CURL_TIMER_CALLBACK
 /* Called from curl_multi_do_locked, with s->mutex held.  */
 static int curl_timer_cb(CURLM *multi, long timeout_ms, void *opaque)
@@ -274,11 +489,12 @@  static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque)
 {
     CURLState *state = (CURLState *)opaque;
     BDRVCURLState *s = state->s;
+    block_t *b = state->cache_block;
     size_t realsize = size * nmemb;
 
     trace_curl_read_cb(realsize);
 
-    if (!state || !state->orig_buf) {
+    if (!state || !b) {
         goto read_end;
     }
 
@@ -287,8 +503,9 @@  static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque)
         goto read_end;
     }
     realsize = MIN(realsize, s->blocksize - state->buf_off);
-    memcpy(state->orig_buf + state->buf_off, ptr, realsize);
+    memcpy(b->buf + state->buf_off, ptr, realsize);
     state->buf_off += realsize;
+    b->count += realsize;
 
 read_end:
     /* curl will error out if we do not return this value */
@@ -296,35 +513,38 @@  read_end:
 }
 
 /* Called with s->mutex held.  */
-static bool curl_find_buf(BDRVCURLState *s, uint64_t start, uint64_t len,
-                          CURLAIOCB *acb)
+static bool curl_pending_find(BDRVCURLState *s, CURLAIOCB *acb)
 {
     int i;
+    uint64_t start = acb->offset;
+    uint64_t len = acb->bytes;
     uint64_t end = start + len;
     uint64_t clamped_end = MIN(end, s->len);
     uint64_t clamped_len = clamped_end - start;
 
     for (i = 0; i < CURL_NUM_STATES; i++) {
         CURLState *state = &s->states[i];
-        /* The end of the currently valid data. */
-        uint64_t buf_end = state->buf_start + state->buf_off;
-        /* The end of the valid data when the IO completes. */
-        uint64_t buf_fend = state->buf_start + s->blocksize;
+        block_t *b = state->cache_block;
+        uint64_t buf_end, buf_fend;
 
-        if (!state->orig_buf)
-            continue;
-        if (!state->buf_off)
+        if (!b) {
             continue;
+        }
+
+        /* The end of the currently valid data. */
+        buf_end = b->start + state->buf_off;
+        /* The end of the valid data when the IO completes. */
+        buf_fend = b->start + s->blocksize;
 
         /*
          * Does the existing buffer cover our section?
          */
-        if ((start >= state->buf_start) &&
+        if ((start >= b->start) &&
             (start <= buf_end) &&
-            (clamped_end >= state->buf_start) &&
+            (clamped_end >= b->start) &&
             (clamped_end <= buf_end))
         {
-            char *buf = state->orig_buf + curl_block_offset(s, start);
+            char *buf = b->buf + curl_block_offset(s, start);
 
             trace_curl_pending_hit(qemu_coroutine_self(),
                                    start, len);
@@ -343,9 +563,9 @@  static bool curl_find_buf(BDRVCURLState *s, uint64_t start, uint64_t len,
          * aiocb.
          */
         if (state->in_use &&
-            (start >= state->buf_start) &&
+            (start >= b->start) &&
             (start <= buf_fend) &&
-            (clamped_end >= state->buf_start) &&
+            (clamped_end >= b->start) &&
             (clamped_end <= buf_fend))
         {
             int j;
@@ -388,10 +608,10 @@  static void curl_multi_check_completion(BDRVCURLState *s)
             int i;
             CURLState *state = NULL;
             bool error = msg->data.result != CURLE_OK;
+            block_t *b;
 
             curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE,
                               (char **)&state);
-
             if (error) {
                 static int errcount = 100;
 
@@ -406,6 +626,8 @@  static void curl_multi_check_completion(BDRVCURLState *s)
                 }
             }
 
+            b = state->cache_block;
+
             for (i = 0; i < CURL_NUM_ACB; i++) {
                 CURLAIOCB *acb = state->acb[i];
 
@@ -418,7 +640,7 @@  static void curl_multi_check_completion(BDRVCURLState *s)
                     assert(state->buf_off >= acb->end);
 
                     qemu_iovec_from_buf(acb->qiov, acb->qiov_offset,
-                                        state->orig_buf + acb->start,
+                                        b->buf + acb->start,
                                         acb->end - acb->start);
 
                     if (acb->end - acb->start < acb->bytes) {
@@ -436,6 +658,9 @@  static void curl_multi_check_completion(BDRVCURLState *s)
                 qemu_mutex_lock(&s->mutex);
             }
 
+            curl_cache_put(s, b, true);
+            state->cache_block = NULL;
+
             curl_clean_state(state);
             break;
         }
@@ -612,8 +837,10 @@  static void curl_detach_aio_context(BlockDriverState *bs)
             curl_easy_cleanup(s->states[i].curl);
             s->states[i].curl = NULL;
         }
-        g_free(s->states[i].orig_buf);
-        s->states[i].orig_buf = NULL;
+        if (s->states[i].cache_block) {
+            curl_cache_free(s, s->states[i].cache_block);
+            s->states[i].cache_block = NULL;
+        }
     }
     if (s->multi) {
         curl_multi_cleanup(s->multi);
@@ -868,6 +1095,10 @@  static int curl_open(BlockDriverState *bs, QDict *options, int flags,
     }
     trace_curl_open_size(s->len);
 
+    if (!curl_cache_init(s)) {
+        goto out;
+    }
+
     qemu_mutex_lock(&s->mutex);
     curl_clean_state(state);
     qemu_mutex_unlock(&s->mutex);
@@ -898,6 +1129,7 @@  static void curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb)
 {
     CURLState *state;
     int running;
+    block_t *b;
 
     BDRVCURLState *s = bs->opaque;
 
@@ -910,11 +1142,16 @@  static void curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb)
 
     qemu_mutex_lock(&s->mutex);
 
+    /* Can this request be handled using a cached block? */
+    if (curl_cache_find(s, acb)) {
+        goto out;
+    }
+
     /*
-     * Check whether the requested data can be found in an existing or
-     * pending IO request.
+     * Check whether the requested data can be found in a pending IO
+     * request.
      */
-    if (curl_find_buf(s, acb->offset, acb->bytes, acb)) {
+    if (curl_pending_find(s, acb)) {
         goto out;
     }
 
@@ -935,25 +1172,34 @@  static void curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb)
         goto out;
     }
 
-    acb->start = curl_block_offset(s, acb->offset);
-    acb->end = acb->start + MIN(acb->bytes, s->len - acb->offset);
-
-    state->buf_off = 0;
-    state->buf_start = curl_block_align(s, acb->offset);
-    if (!state->orig_buf) {
-        state->orig_buf = g_try_malloc(s->blocksize);
-    }
-    if (!state->orig_buf) {
+    b = curl_cache_get(s);
+    if (!b) {
         curl_clean_state(state);
         acb->ret = -ENOMEM;
         goto out;
     }
+
+    state->cache_block = b;
+
+    /*
+     * Any already cached or in-progress IO for
+     * curl_cache_base(acb->offset) would have been found by
+     * curl_cache_find() or curl_pending_find() respectively, so this
+     * must be a new request for that block.
+     */
+    b->start = curl_block_align(s, acb->offset);
+    b->count = 0; /* Nothing read so far. */
+
+    acb->start = curl_block_offset(s, acb->offset);
+    acb->end = acb->start + MIN(acb->bytes, s->len - acb->offset);
+
+    state->buf_off = 0;
     state->acb[0] = acb;
 
     snprintf(state->range, 127, "%" PRIu64 "-%" PRIu64,
-             s->offset + state->buf_start,
-             s->offset + state->buf_start + s->blocksize);
-    trace_curl_setup_preadv(qemu_coroutine_self(), state->buf_start,
+             s->offset + b->start,
+             s->offset + b->start + s->blocksize);
+    trace_curl_setup_preadv(qemu_coroutine_self(), b->start,
                             s->blocksize);
     curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range);
 
@@ -1027,6 +1273,7 @@  static void curl_close(BlockDriverState *bs)
     BDRVCURLState *s = bs->opaque;
 
     trace_curl_close();
+    curl_cache_deinit(s);
     curl_detach_aio_context(bs);
     qemu_mutex_destroy(&s->mutex);
 
diff --git a/block/trace-events b/block/trace-events
index 72b1e927bf..deac37c4ad 100644
--- a/block/trace-events
+++ b/block/trace-events
@@ -206,7 +206,10 @@  curl_setup_preadv(void *co, uint64_t offset, uint64_t bytes) "co %p requests 0x%
 curl_pending_hit(void *co, uint64_t start, uint64_t len) "co %p finds 0x%" PRIx64 " + 0x%" PRIx64
 curl_pending_piggyback(void *co, uint64_t start, uint64_t len) "co %p pending 0x%" PRIx64 " + 0x%" PRIx64
 curl_pending_miss(void *co, uint64_t start, uint64_t len) "co %p misses 0x%" PRIx64 " + 0x%" PRIx64
+curl_cache_hit(void *co, uint64_t start, uint64_t len) "co %p finds 0x%" PRIx64 " + 0x%" PRIx64
 curl_close(void) "close"
+curl_cache_alloc(size_t n) "%zu cache blocks allocated"
+curl_cache_free(size_t n) "%zu cache blocks allocated"
 
 # file-posix.c
 file_xfs_write_zeroes(const char *error) "cannot write zero range (%s)"