@@ -365,6 +365,12 @@ BlockBackend *blk_new(AioContext *ctx, uint64_t perm, uint64_t shared_perm)
notifier_list_init(&blk->remove_bs_notifiers);
notifier_list_init(&blk->insert_bs_notifiers);
+ /* for rehandle */
+ blk->reinfo.enable = false;
+ blk->reinfo.ts = NULL;
+ qatomic_set(&blk->reinfo.in_flight, 0);
+ QTAILQ_INIT(&blk->reinfo.re_aios);
+
QLIST_INIT(&blk->aio_notifiers);
QTAILQ_INSERT_TAIL(&block_backends, blk, link);
@@ -1425,8 +1431,16 @@ static const AIOCBInfo blk_aio_em_aiocb_info = {
.get_aio_context = blk_aio_em_aiocb_get_aio_context,
};
+static void blk_rehandle_timer_cb(void *opaque);
+static void blk_rehandle_aio_complete(BlkAioEmAIOCB *acb);
+
static void blk_aio_complete(BlkAioEmAIOCB *acb)
{
+ if (acb->rwco.blk->reinfo.enable) {
+ blk_rehandle_aio_complete(acb);
+ return;
+ }
+
if (acb->has_returned) {
acb->common.cb(acb->common.opaque, acb->rwco.ret);
blk_dec_in_flight(acb->rwco.blk);
@@ -1459,6 +1473,7 @@ static BlockAIOCB *blk_aio_prwv(BlockBackend *blk, int64_t offset, int bytes,
.ret = NOT_DONE,
};
acb->bytes = bytes;
+ acb->co_entry = co_entry;
acb->has_returned = false;
co = qemu_coroutine_create(co_entry, acb);
@@ -2054,6 +2069,20 @@ static int blk_do_set_aio_context(BlockBackend *blk, AioContext *new_context,
throttle_group_attach_aio_context(tgm, new_context);
bdrv_drained_end(bs);
}
+
+ if (blk->reinfo.enable) {
+ if (blk->reinfo.ts) {
+ timer_del(blk->reinfo.ts);
+ timer_free(blk->reinfo.ts);
+ }
+ blk->reinfo.ts = aio_timer_new(new_context, QEMU_CLOCK_REALTIME,
+ SCALE_MS, blk_rehandle_timer_cb,
+ blk);
+ if (qatomic_read(&blk->reinfo.in_flight)) {
+ timer_mod(blk->reinfo.ts,
+ qemu_clock_get_ms(QEMU_CLOCK_REALTIME));
+ }
+ }
}
blk->ctx = new_context;
@@ -2406,6 +2435,66 @@ static void blk_root_drained_end(BdrvChild *child, int *drained_end_counter)
}
}
+static void blk_rehandle_insert_aiocb(BlockBackend *blk, BlkAioEmAIOCB *acb)
+{
+ assert(blk->reinfo.enable);
+
+ qatomic_inc(&blk->reinfo.in_flight);
+ QTAILQ_INSERT_TAIL(&blk->reinfo.re_aios, acb, list);
+ timer_mod(blk->reinfo.ts, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) +
+ blk->reinfo.timer_interval_ms);
+}
+
+static void blk_rehandle_remove_aiocb(BlockBackend *blk, BlkAioEmAIOCB *acb)
+{
+ QTAILQ_REMOVE(&blk->reinfo.re_aios, acb, list);
+ qatomic_dec(&blk->reinfo.in_flight);
+}
+
+static void blk_rehandle_timer_cb(void *opaque)
+{
+ BlockBackend *blk = opaque;
+ BlockBackendRehandleInfo *reinfo = &blk->reinfo;
+ BlkAioEmAIOCB *acb, *tmp;
+ Coroutine *co;
+
+ aio_context_acquire(blk_get_aio_context(blk));
+ QTAILQ_FOREACH_SAFE(acb, &reinfo->re_aios, list, tmp) {
+ if (acb->rwco.ret == NOT_DONE) {
+ continue;
+ }
+
+ blk_inc_in_flight(acb->rwco.blk);
+ acb->rwco.ret = NOT_DONE;
+ acb->has_returned = false;
+ blk_rehandle_remove_aiocb(acb->rwco.blk, acb);
+
+ co = qemu_coroutine_create(acb->co_entry, acb);
+ qemu_coroutine_enter(co);
+
+ acb->has_returned = true;
+ if (acb->rwco.ret != NOT_DONE) {
+ replay_bh_schedule_oneshot_event(blk_get_aio_context(blk),
+ blk_aio_complete_bh, acb);
+ }
+ }
+ aio_context_release(blk_get_aio_context(blk));
+}
+
+static void blk_rehandle_aio_complete(BlkAioEmAIOCB *acb)
+{
+ if (acb->has_returned) {
+ blk_dec_in_flight(acb->rwco.blk);
+ if (acb->rwco.ret == -EIO) {
+ blk_rehandle_insert_aiocb(acb->rwco.blk, acb);
+ return;
+ }
+
+ acb->common.cb(acb->common.opaque, acb->rwco.ret);
+ qemu_aio_unref(acb);
+ }
+}
+
void blk_register_buf(BlockBackend *blk, void *host, size_t size)
{
bdrv_register_buf(blk_bs(blk), host, size);