@@ -4235,6 +4235,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
dout("handle_caps from mds%d\n", session->s_mds);
+ if (!ceph_inc_mds_stopping_blocker(mdsc, session))
+ return;
+
/* decode */
end = msg->front.iov_base + msg->front.iov_len;
if (msg->front.iov_len < sizeof(*h))
@@ -4336,7 +4339,6 @@ void ceph_handle_caps(struct ceph_mds_session *session,
vino.snap, inode);
mutex_lock(&session->s_mutex);
- inc_session_sequence(session);
dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
(unsigned)seq);
@@ -4448,6 +4450,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
done_unlocked:
iput(inode);
out:
+ ceph_dec_mds_stopping_blocker(mdsc);
+
ceph_put_string(extra_info.pool_ns);
/* Defer closing the sessions after s_mutex lock being released */
@@ -4904,6 +4904,9 @@ static void handle_lease(struct ceph_mds_client *mdsc,
dout("handle_lease from mds%d\n", mds);
+ if (!ceph_inc_mds_stopping_blocker(mdsc, session))
+ return;
+
/* decode */
if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
goto bad;
@@ -4922,8 +4925,6 @@ static void handle_lease(struct ceph_mds_client *mdsc,
dname.len, dname.name);
mutex_lock(&session->s_mutex);
- inc_session_sequence(session);
-
if (!inode) {
dout("handle_lease no inode %llx\n", vino.ino);
goto release;
@@ -4985,9 +4986,13 @@ static void handle_lease(struct ceph_mds_client *mdsc,
out:
mutex_unlock(&session->s_mutex);
iput(inode);
+
+ ceph_dec_mds_stopping_blocker(mdsc);
return;
bad:
+ ceph_dec_mds_stopping_blocker(mdsc);
+
pr_err("corrupt lease message\n");
ceph_msg_dump(msg);
}
@@ -5183,6 +5188,9 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
}
init_completion(&mdsc->safe_umount_waiters);
+ spin_lock_init(&mdsc->stopping_lock);
+ atomic_set(&mdsc->stopping_blockers, 0);
+ init_completion(&mdsc->stopping_waiter);
init_waitqueue_head(&mdsc->session_close_wq);
INIT_LIST_HEAD(&mdsc->waiting_for_map);
mdsc->quotarealms_inodes = RB_ROOT;
@@ -5297,7 +5305,7 @@ void send_flush_mdlog(struct ceph_mds_session *s)
void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
{
dout("pre_umount\n");
- mdsc->stopping = 1;
+ mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN;
ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
@@ -398,6 +398,11 @@ struct cap_wait {
int want;
};
+enum {
+ CEPH_MDSC_STOPPING_BEGIN = 1,
+ CEPH_MDSC_STOPPING_FLUSHED = 2,
+};
+
/*
* mds client state
*/
@@ -414,7 +419,11 @@ struct ceph_mds_client {
struct ceph_mds_session **sessions; /* NULL for mds if no session */
atomic_t num_sessions;
int max_sessions; /* len of sessions array */
- int stopping; /* true if shutting down */
+
+ spinlock_t stopping_lock; /* protect snap_empty */
+ int stopping; /* the stage of shutting down */
+ atomic_t stopping_blockers;
+ struct completion stopping_waiter;
atomic64_t quotarealms_count; /* # realms with quota */
/*
@@ -47,25 +47,23 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc,
struct inode *inode;
struct ceph_inode_info *ci;
+ if (!ceph_inc_mds_stopping_blocker(mdsc, session))
+ return;
+
if (msg->front.iov_len < sizeof(*h)) {
pr_err("%s corrupt message mds%d len %d\n", __func__,
session->s_mds, (int)msg->front.iov_len);
ceph_msg_dump(msg);
- return;
+ goto out;
}
- /* increment msg sequence number */
- mutex_lock(&session->s_mutex);
- inc_session_sequence(session);
- mutex_unlock(&session->s_mutex);
-
/* lookup inode */
vino.ino = le64_to_cpu(h->ino);
vino.snap = CEPH_NOSNAP;
inode = ceph_find_inode(sb, vino);
if (!inode) {
pr_warn("Failed to find inode %llu\n", vino.ino);
- return;
+ goto out;
}
ci = ceph_inode(inode);
@@ -78,6 +76,8 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc,
spin_unlock(&ci->i_ceph_lock);
iput(inode);
+out:
+ ceph_dec_mds_stopping_blocker(mdsc);
}
static struct ceph_quotarealm_inode *
@@ -1016,6 +1016,9 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
int locked_rwsem = 0;
bool close_sessions = false;
+ if (!ceph_inc_mds_stopping_blocker(mdsc, session))
+ return;
+
/* decode */
if (msg->front.iov_len < sizeof(*h))
goto bad;
@@ -1031,10 +1034,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
dout("%s from mds%d op %s split %llx tracelen %d\n", __func__,
mds, ceph_snap_op_name(op), split, trace_len);
- mutex_lock(&session->s_mutex);
- inc_session_sequence(session);
- mutex_unlock(&session->s_mutex);
-
down_write(&mdsc->snap_rwsem);
locked_rwsem = 1;
@@ -1152,12 +1151,15 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
up_write(&mdsc->snap_rwsem);
flush_snaps(mdsc);
+ ceph_dec_mds_stopping_blocker(mdsc);
return;
bad:
pr_err("%s corrupt snap message from mds%d\n", __func__, mds);
ceph_msg_dump(msg);
out:
+ ceph_dec_mds_stopping_blocker(mdsc);
+
if (locked_rwsem)
up_write(&mdsc->snap_rwsem);
@@ -169,6 +169,7 @@ enum {
Opt_wsync,
Opt_pagecache,
Opt_sparseread,
+ Opt_umount_timeout,
};
enum ceph_recover_session_mode {
@@ -214,6 +215,7 @@ static const struct fs_parameter_spec ceph_mount_parameters[] = {
fsparam_flag_no ("wsync", Opt_wsync),
fsparam_flag_no ("pagecache", Opt_pagecache),
fsparam_flag_no ("sparseread", Opt_sparseread),
+ fsparam_u32 ("umount_timeout", Opt_umount_timeout),
{}
};
@@ -600,6 +602,12 @@ static int ceph_parse_mount_param(struct fs_context *fc,
else
fsopt->flags |= CEPH_MOUNT_OPT_SPARSEREAD;
break;
+ case Opt_umount_timeout:
+ /* 0 is "wait forever" (i.e. infinite timeout) */
+ if (result.uint_32 > INT_MAX / 1000)
+ goto out_of_range;
+ fsopt->umount_timeout = msecs_to_jiffies(result.uint_32 * 1000);
+ break;
case Opt_test_dummy_encryption:
#ifdef CONFIG_FS_ENCRYPTION
fscrypt_free_dummy_policy(&fsopt->dummy_enc_policy);
@@ -779,6 +787,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
seq_printf(m, ",readdir_max_entries=%u", fsopt->max_readdir);
if (fsopt->max_readdir_bytes != CEPH_MAX_READDIR_BYTES_DEFAULT)
seq_printf(m, ",readdir_max_bytes=%u", fsopt->max_readdir_bytes);
+ if (fsopt->umount_timeout != CEPH_UMOUNT_TIMEOUT_DEFAULT)
+ seq_printf(m, ",umount_timeout=%u", fsopt->umount_timeout);
if (strcmp(fsopt->snapdir_name, CEPH_SNAPDIRNAME_DEFAULT))
seq_show_option(m, "snapdirname", fsopt->snapdir_name);
@@ -1456,6 +1466,7 @@ static int ceph_init_fs_context(struct fs_context *fc)
fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
fsopt->congestion_kb = default_congestion_kb();
+ fsopt->umount_timeout = CEPH_UMOUNT_TIMEOUT_DEFAULT;
#ifdef CONFIG_CEPH_FS_POSIX_ACL
fc->sb_flags |= SB_POSIXACL;
@@ -1472,15 +1483,90 @@ static int ceph_init_fs_context(struct fs_context *fc)
return -ENOMEM;
}
+/*
+ * Return true if it successfully increases the blocker counter,
+ * or false if the mdsc is in stopping and flushed state.
+ */
+static bool __inc_stopping_blocker(struct ceph_mds_client *mdsc)
+{
+ spin_lock(&mdsc->stopping_lock);
+ if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED) {
+ spin_unlock(&mdsc->stopping_lock);
+ return false;
+ }
+ atomic_inc(&mdsc->stopping_blockers);
+ spin_unlock(&mdsc->stopping_lock);
+ return true;
+}
+
+static void __dec_stopping_blocker(struct ceph_mds_client *mdsc)
+{
+ spin_lock(&mdsc->stopping_lock);
+ if (!atomic_dec_return(&mdsc->stopping_blockers) &&
+ mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED)
+ complete_all(&mdsc->stopping_waiter);
+ spin_unlock(&mdsc->stopping_lock);
+}
+
+/* For metadata IO requests */
+bool ceph_inc_mds_stopping_blocker(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ mutex_lock(&session->s_mutex);
+ inc_session_sequence(session);
+ mutex_unlock(&session->s_mutex);
+
+ return __inc_stopping_blocker(mdsc);
+}
+
+void ceph_dec_mds_stopping_blocker(struct ceph_mds_client *mdsc)
+{
+ __dec_stopping_blocker(mdsc);
+}
+
static void ceph_kill_sb(struct super_block *s)
{
struct ceph_fs_client *fsc = ceph_sb_to_client(s);
+ struct ceph_mds_client *mdsc = fsc->mdsc;
+ struct ceph_mount_options *opt = fsc->mount_options;
+ bool wait;
dout("kill_sb %p\n", s);
- ceph_mdsc_pre_umount(fsc->mdsc);
+ ceph_mdsc_pre_umount(mdsc);
flush_fs_workqueues(fsc);
+ /*
+ * Though the kill_anon_super() will finally trigger the
+ * sync_filesystem() anyway, we still need to do it here and
+ * then bump the stage of shutdown. This will allow us to
+ * drop any further message, which will increase the inodes'
+ * i_count reference counters but makes no sense any more,
+ * from MDSs.
+ *
+ * Without this when evicting the inodes it may fail in the
+ * kill_anon_super(), which will trigger a warning when
+ * destroying the fscrypt keyring and then possibly trigger
+ * a further crash in ceph module when the iput() tries to
+ * evict the inodes later.
+ */
+ sync_filesystem(s);
+
+ spin_lock(&mdsc->stopping_lock);
+ mdsc->stopping = CEPH_MDSC_STOPPING_FLUSHED;
+ wait = !!atomic_read(&mdsc->stopping_blockers);
+ spin_unlock(&mdsc->stopping_lock);
+
+ if (wait && atomic_read(&mdsc->stopping_blockers)) {
+ long timeleft = wait_for_completion_killable_timeout(
+ &mdsc->stopping_waiter,
+ opt->umount_timeout);
+ if (!timeleft) /* timed out */
+ pr_warn("umount timed out, %ld\n", timeleft);
+ else if (timeleft < 0) /* killed */
+ pr_warn("umount was killed, %ld\n", timeleft);
+ }
+
kill_anon_super(s);
fsc->client->extra_mon_dispatch = NULL;
@@ -66,6 +66,7 @@
#define CEPH_MAX_READDIR_DEFAULT 1024
#define CEPH_MAX_READDIR_BYTES_DEFAULT (512*1024)
#define CEPH_SNAPDIRNAME_DEFAULT ".snap"
+#define CEPH_UMOUNT_TIMEOUT_DEFAULT msecs_to_jiffies(60 * 1000)
/*
* Delay telling the MDS we no longer want caps, in case we reopen
@@ -87,6 +88,7 @@ struct ceph_mount_options {
int caps_max;
unsigned int max_readdir; /* max readdir result (entries) */
unsigned int max_readdir_bytes; /* max readdir result (bytes) */
+ unsigned int umount_timeout; /* jiffies */
bool new_dev_syntax;
@@ -1413,4 +1415,7 @@ extern bool ceph_quota_update_statfs(struct ceph_fs_client *fsc,
struct kstatfs *buf);
extern void ceph_cleanup_quotarealms_inodes(struct ceph_mds_client *mdsc);
+bool ceph_inc_mds_stopping_blocker(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session);
+void ceph_dec_mds_stopping_blocker(struct ceph_mds_client *mdsc);
#endif /* _FS_CEPH_SUPER_H */