@@ -865,13 +865,6 @@ static int ceph_mknod(struct mnt_idmap *idmap, struct inode *dir,
goto out;
}
- err = ceph_pre_init_acls(dir, &mode, &as_ctx);
- if (err < 0)
- goto out;
- err = ceph_security_init_secctx(dentry, mode, &as_ctx);
- if (err < 0)
- goto out;
-
dout("mknod in dir %p dentry %p mode 0%ho rdev %d\n",
dir, dentry, mode, rdev);
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS);
@@ -879,6 +872,14 @@ static int ceph_mknod(struct mnt_idmap *idmap, struct inode *dir,
err = PTR_ERR(req);
goto out;
}
+
+ req->r_new_inode = ceph_new_inode(dir, dentry, &mode, &as_ctx);
+ if (IS_ERR(req->r_new_inode)) {
+ err = PTR_ERR(req->r_new_inode);
+ req->r_new_inode = NULL;
+ goto out_req;
+ }
+
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
req->r_parent = dir;
@@ -888,13 +889,13 @@ static int ceph_mknod(struct mnt_idmap *idmap, struct inode *dir,
req->r_args.mknod.rdev = cpu_to_le32(rdev);
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
- if (as_ctx.pagelist) {
- req->r_pagelist = as_ctx.pagelist;
- as_ctx.pagelist = NULL;
- }
+
+ ceph_as_ctx_to_req(req, &as_ctx);
+
err = ceph_mdsc_do_request(mdsc, dir, req);
if (!err && !req->r_reply_info.head->is_dentry)
err = ceph_handle_notrace_create(dir, dentry);
+out_req:
ceph_mdsc_put_request(req);
out:
if (!err)
@@ -917,6 +918,7 @@ static int ceph_symlink(struct mnt_idmap *idmap, struct inode *dir,
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
struct ceph_mds_request *req;
struct ceph_acl_sec_ctx as_ctx = {};
+ umode_t mode = S_IFLNK | 0777;
int err;
if (ceph_snap(dir) != CEPH_NOSNAP)
@@ -931,21 +933,24 @@ static int ceph_symlink(struct mnt_idmap *idmap, struct inode *dir,
goto out;
}
- err = ceph_security_init_secctx(dentry, S_IFLNK | 0777, &as_ctx);
- if (err < 0)
- goto out;
-
dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest);
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS);
if (IS_ERR(req)) {
err = PTR_ERR(req);
goto out;
}
+
+ req->r_new_inode = ceph_new_inode(dir, dentry, &mode, &as_ctx);
+ if (IS_ERR(req->r_new_inode)) {
+ err = PTR_ERR(req->r_new_inode);
+ req->r_new_inode = NULL;
+ goto out_req;
+ }
+
req->r_path2 = kstrdup(dest, GFP_KERNEL);
if (!req->r_path2) {
err = -ENOMEM;
- ceph_mdsc_put_request(req);
- goto out;
+ goto out_req;
}
req->r_parent = dir;
ihold(dir);
@@ -955,13 +960,13 @@ static int ceph_symlink(struct mnt_idmap *idmap, struct inode *dir,
req->r_num_caps = 2;
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
- if (as_ctx.pagelist) {
- req->r_pagelist = as_ctx.pagelist;
- as_ctx.pagelist = NULL;
- }
+
+ ceph_as_ctx_to_req(req, &as_ctx);
+
err = ceph_mdsc_do_request(mdsc, dir, req);
if (!err && !req->r_reply_info.head->is_dentry)
err = ceph_handle_notrace_create(dir, dentry);
+out_req:
ceph_mdsc_put_request(req);
out:
if (err)
@@ -1002,13 +1007,6 @@ static int ceph_mkdir(struct mnt_idmap *idmap, struct inode *dir,
goto out;
}
- mode |= S_IFDIR;
- err = ceph_pre_init_acls(dir, &mode, &as_ctx);
- if (err < 0)
- goto out;
- err = ceph_security_init_secctx(dentry, mode, &as_ctx);
- if (err < 0)
- goto out;
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
if (IS_ERR(req)) {
@@ -1016,6 +1014,14 @@ static int ceph_mkdir(struct mnt_idmap *idmap, struct inode *dir,
goto out;
}
+ mode |= S_IFDIR;
+ req->r_new_inode = ceph_new_inode(dir, dentry, &mode, &as_ctx);
+ if (IS_ERR(req->r_new_inode)) {
+ err = PTR_ERR(req->r_new_inode);
+ req->r_new_inode = NULL;
+ goto out_req;
+ }
+
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
req->r_parent = dir;
@@ -1024,15 +1030,15 @@ static int ceph_mkdir(struct mnt_idmap *idmap, struct inode *dir,
req->r_args.mkdir.mode = cpu_to_le32(mode);
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
- if (as_ctx.pagelist) {
- req->r_pagelist = as_ctx.pagelist;
- as_ctx.pagelist = NULL;
- }
+
+ ceph_as_ctx_to_req(req, &as_ctx);
+
err = ceph_mdsc_do_request(mdsc, dir, req);
if (!err &&
!req->r_reply_info.head->is_target &&
!req->r_reply_info.head->is_dentry)
err = ceph_handle_notrace_create(dir, dentry);
+out_req:
ceph_mdsc_put_request(req);
out:
if (!err)
@@ -604,7 +604,8 @@ static void ceph_async_create_cb(struct ceph_mds_client *mdsc,
ceph_mdsc_release_dir_caps(req);
}
-static int ceph_finish_async_create(struct inode *dir, struct dentry *dentry,
+static int ceph_finish_async_create(struct inode *dir, struct inode *inode,
+ struct dentry *dentry,
struct file *file, umode_t mode,
struct ceph_mds_request *req,
struct ceph_acl_sec_ctx *as_ctx,
@@ -616,7 +617,6 @@ static int ceph_finish_async_create(struct inode *dir, struct dentry *dentry,
struct ceph_mds_reply_info_in iinfo = { .in = &in };
struct ceph_inode_info *ci = ceph_inode(dir);
struct ceph_dentry_info *di = ceph_dentry(dentry);
- struct inode *inode;
struct timespec64 now;
struct ceph_string *pool_ns;
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
@@ -625,10 +625,6 @@ static int ceph_finish_async_create(struct inode *dir, struct dentry *dentry,
ktime_get_real_ts64(&now);
- inode = ceph_get_inode(dentry->d_sb, vino);
- if (IS_ERR(inode))
- return PTR_ERR(inode);
-
iinfo.inline_version = CEPH_INLINE_NONE;
iinfo.change_attr = 1;
ceph_encode_timespec64(&iinfo.btime, &now);
@@ -686,8 +682,7 @@ static int ceph_finish_async_create(struct inode *dir, struct dentry *dentry,
ceph_dir_clear_complete(dir);
if (!d_unhashed(dentry))
d_drop(dentry);
- if (inode->i_state & I_NEW)
- discard_new_inode(inode);
+ discard_new_inode(inode);
} else {
struct dentry *dn;
@@ -733,6 +728,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_mds_request *req;
+ struct inode *new_inode = NULL;
struct dentry *dn;
struct ceph_acl_sec_ctx as_ctx = {};
bool try_async = ceph_test_mount_opt(fsc, ASYNC_DIROPS);
@@ -755,15 +751,16 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
*/
flags &= ~O_TRUNC;
+retry:
if (flags & O_CREAT) {
if (ceph_quota_is_max_files_exceeded(dir))
return -EDQUOT;
- err = ceph_pre_init_acls(dir, &mode, &as_ctx);
- if (err < 0)
- return err;
- err = ceph_security_init_secctx(dentry, mode, &as_ctx);
- if (err < 0)
+
+ new_inode = ceph_new_inode(dir, dentry, &mode, &as_ctx);
+ if (IS_ERR(new_inode)) {
+ err = PTR_ERR(new_inode);
goto out_ctx;
+ }
/* Async create can't handle more than a page of xattrs */
if (as_ctx.pagelist &&
!list_is_singular(&as_ctx.pagelist->head))
@@ -772,7 +769,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
/* If it's not being looked up, it's negative */
return -ENOENT;
}
-retry:
+
/* do the open */
req = prepare_open_request(dir->i_sb, flags, mode);
if (IS_ERR(req)) {
@@ -793,32 +790,45 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
- if (as_ctx.pagelist) {
- req->r_pagelist = as_ctx.pagelist;
- as_ctx.pagelist = NULL;
- }
- if (try_async &&
- (req->r_dir_caps =
- try_prep_async_create(dir, dentry, &lo,
- &req->r_deleg_ino))) {
+
+ ceph_as_ctx_to_req(req, &as_ctx);
+
+ if (try_async && (req->r_dir_caps =
+ try_prep_async_create(dir, dentry, &lo, &req->r_deleg_ino))) {
+ struct ceph_vino vino = { .ino = req->r_deleg_ino,
+ .snap = CEPH_NOSNAP };
struct ceph_dentry_info *di = ceph_dentry(dentry);
set_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags);
req->r_args.open.flags |= cpu_to_le32(CEPH_O_EXCL);
req->r_callback = ceph_async_create_cb;
+ /* Hash inode before RPC */
+ new_inode = ceph_get_inode(dir->i_sb, vino, new_inode);
+ if (IS_ERR(new_inode)) {
+ err = PTR_ERR(new_inode);
+ new_inode = NULL;
+ goto out_req;
+ }
+ WARN_ON_ONCE(!(new_inode->i_state & I_NEW));
+
spin_lock(&dentry->d_lock);
di->flags |= CEPH_DENTRY_ASYNC_CREATE;
spin_unlock(&dentry->d_lock);
err = ceph_mdsc_submit_request(mdsc, dir, req);
if (!err) {
- err = ceph_finish_async_create(dir, dentry,
+ err = ceph_finish_async_create(dir, new_inode, dentry,
file, mode, req,
&as_ctx, &lo);
+ new_inode = NULL;
} else if (err == -EJUKEBOX) {
restore_deleg_ino(dir, req->r_deleg_ino);
ceph_mdsc_put_request(req);
+ discard_new_inode(new_inode);
+ ceph_release_acl_sec_ctx(&as_ctx);
+ memset(&as_ctx, 0, sizeof(as_ctx));
+ new_inode = NULL;
try_async = false;
ceph_put_string(rcu_dereference_raw(lo.pool_ns));
goto retry;
@@ -829,6 +839,8 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
}
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
+ req->r_new_inode = new_inode;
+ new_inode = NULL;
err = ceph_mdsc_do_request(mdsc, (flags & O_CREAT) ? dir : NULL, req);
if (err == -ENOENT) {
dentry = ceph_handle_snapdir(req, dentry);
@@ -869,6 +881,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
}
out_req:
ceph_mdsc_put_request(req);
+ iput(new_inode);
out_ctx:
ceph_release_acl_sec_ctx(&as_ctx);
dout("atomic_open result=%d\n", err);
@@ -52,17 +52,85 @@ static int ceph_set_ino_cb(struct inode *inode, void *data)
return 0;
}
-struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino)
+/**
+ * ceph_new_inode - allocate a new inode in advance of an expected create
+ * @dir: parent directory for new inode
+ * @dentry: dentry that may eventually point to new inode
+ * @mode: mode of new inode
+ * @as_ctx: pointer to inherited security context
+ *
+ * Allocate a new inode in advance of an operation to create a new inode.
+ * This allocates the inode and sets up the acl_sec_ctx with appropriate
+ * info for the new inode.
+ *
+ * Returns a pointer to the new inode or an ERR_PTR.
+ */
+struct inode *ceph_new_inode(struct inode *dir, struct dentry *dentry,
+ umode_t *mode, struct ceph_acl_sec_ctx *as_ctx)
+{
+ int err;
+ struct inode *inode;
+
+ inode = new_inode(dir->i_sb);
+ if (!inode)
+ return ERR_PTR(-ENOMEM);
+
+ if (!S_ISLNK(*mode)) {
+ err = ceph_pre_init_acls(dir, mode, as_ctx);
+ if (err < 0)
+ goto out_err;
+ }
+
+ err = ceph_security_init_secctx(dentry, *mode, as_ctx);
+ if (err < 0)
+ goto out_err;
+
+ inode->i_state = 0;
+ inode->i_mode = *mode;
+ return inode;
+out_err:
+ iput(inode);
+ return ERR_PTR(err);
+}
+
+void ceph_as_ctx_to_req(struct ceph_mds_request *req, struct ceph_acl_sec_ctx *as_ctx)
+{
+ if (as_ctx->pagelist) {
+ req->r_pagelist = as_ctx->pagelist;
+ as_ctx->pagelist = NULL;
+ }
+}
+
+/**
+ * ceph_get_inode - find or create/hash a new inode
+ * @sb: superblock to search and allocate in
+ * @vino: vino to search for
+ * @newino: optional new inode to insert if one isn't found (may be NULL)
+ *
+ * Search for or insert a new inode into the hash for the given vino, and return a
+ * reference to it. If new is non-NULL, its reference is consumed.
+ */
+struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino, struct inode *newino)
{
struct inode *inode;
if (ceph_vino_is_reserved(vino))
return ERR_PTR(-EREMOTEIO);
- inode = iget5_locked(sb, (unsigned long)vino.ino, ceph_ino_compare,
- ceph_set_ino_cb, &vino);
- if (!inode)
+ if (newino) {
+ inode = inode_insert5(newino, (unsigned long)vino.ino, ceph_ino_compare,
+ ceph_set_ino_cb, &vino);
+ if (inode != newino)
+ iput(newino);
+ } else {
+ inode = iget5_locked(sb, (unsigned long)vino.ino, ceph_ino_compare,
+ ceph_set_ino_cb, &vino);
+ }
+
+ if (!inode) {
+ dout("No inode found for %llx.%llx\n", vino.ino, vino.snap);
return ERR_PTR(-ENOMEM);
+ }
dout("get_inode on %llu=%llx.%llx got %p new %d\n", ceph_present_inode(inode),
ceph_vinop(inode), inode, !!(inode->i_state & I_NEW));
@@ -78,7 +146,7 @@ struct inode *ceph_get_snapdir(struct inode *parent)
.ino = ceph_ino(parent),
.snap = CEPH_SNAPDIR,
};
- struct inode *inode = ceph_get_inode(parent->i_sb, vino);
+ struct inode *inode = ceph_get_inode(parent->i_sb, vino, NULL);
struct ceph_inode_info *ci = ceph_inode(inode);
if (IS_ERR(inode))
@@ -1552,7 +1620,7 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
vino.ino = le64_to_cpu(rde->inode.in->ino);
vino.snap = le64_to_cpu(rde->inode.in->snapid);
- in = ceph_get_inode(req->r_dentry->d_sb, vino);
+ in = ceph_get_inode(req->r_dentry->d_sb, vino, NULL);
if (IS_ERR(in)) {
err = PTR_ERR(in);
dout("new_inode badness got %d\n", err);
@@ -1754,7 +1822,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
if (d_really_is_positive(dn)) {
in = d_inode(dn);
} else {
- in = ceph_get_inode(parent->d_sb, tvino);
+ in = ceph_get_inode(parent->d_sb, tvino, NULL);
if (IS_ERR(in)) {
dout("new_inode badness\n");
d_drop(dn);
@@ -944,6 +944,7 @@ void ceph_mdsc_release_request(struct kref *kref)
iput(req->r_parent);
}
iput(req->r_target_inode);
+ iput(req->r_new_inode);
if (req->r_dentry)
dput(req->r_dentry);
if (req->r_old_dentry)
@@ -3366,13 +3367,21 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
/* Must find target inode outside of mutexes to avoid deadlocks */
if ((err >= 0) && rinfo->head->is_target) {
- struct inode *in;
+ struct inode *in = xchg(&req->r_new_inode, NULL);
struct ceph_vino tvino = {
.ino = le64_to_cpu(rinfo->targeti.in->ino),
.snap = le64_to_cpu(rinfo->targeti.in->snapid)
};
- in = ceph_get_inode(mdsc->fsc->sb, tvino);
+ /* If we ended up opening an existing inode, discard r_new_inode */
+ if (req->r_op == CEPH_MDS_OP_CREATE && !req->r_reply_info.has_create_ino) {
+ /* This should never happen on an async create */
+ WARN_ON_ONCE(req->r_deleg_ino);
+ iput(in);
+ in = NULL;
+ }
+
+ in = ceph_get_inode(mdsc->fsc->sb, tvino, in);
if (IS_ERR(in)) {
err = PTR_ERR(in);
mutex_lock(&session->s_mutex);
@@ -263,6 +263,7 @@ struct ceph_mds_request {
struct inode *r_parent; /* parent dir inode */
struct inode *r_target_inode; /* resulting inode */
+ struct inode *r_new_inode; /* new inode (for creates) */
#define CEPH_MDS_R_DIRECT_IS_HASH (1) /* r_direct_hash is valid */
#define CEPH_MDS_R_ABORTED (2) /* call was aborted */
@@ -988,6 +988,7 @@ static inline bool __ceph_have_pending_cap_snap(struct ceph_inode_info *ci)
/* inode.c */
struct ceph_mds_reply_info_in;
struct ceph_mds_reply_dirfrag;
+struct ceph_acl_sec_ctx;
extern const struct inode_operations ceph_file_iops;
@@ -995,8 +996,12 @@ extern struct inode *ceph_alloc_inode(struct super_block *sb);
extern void ceph_evict_inode(struct inode *inode);
extern void ceph_free_inode(struct inode *inode);
+struct inode *ceph_new_inode(struct inode *dir, struct dentry *dentry,
+ umode_t *mode, struct ceph_acl_sec_ctx *as_ctx);
+void ceph_as_ctx_to_req(struct ceph_mds_request *req, struct ceph_acl_sec_ctx *as_ctx);
+
extern struct inode *ceph_get_inode(struct super_block *sb,
- struct ceph_vino vino);
+ struct ceph_vino vino, struct inode *newino);
extern struct inode *ceph_get_snapdir(struct inode *parent);
extern int ceph_fill_file_size(struct inode *inode, int issued,
u32 truncate_seq, u64 truncate_size, u64 size);