@@ -132,6 +132,7 @@ struct ceph_client {
struct ceph_messenger msgr; /* messenger instance */
struct ceph_mon_client monc;
struct ceph_osd_client osdc;
+ atomic_t have_mon_and_osd_map;
#ifdef CONFIG_DEBUG_FS
struct dentry *debugfs_dir;
@@ -141,6 +142,25 @@ struct ceph_client {
#endif
};
+/*
+ * The have_mon_and_osd_map possible states
+ */
+enum {
+ CEPH_CLIENT_HAS_NO_MON_AND_NO_OSD_MAP = 0,
+ CEPH_CLIENT_HAS_ONLY_ONE_MAP = 1,
+ CEPH_CLIENT_HAS_MON_AND_OSD_MAP = 2,
+ CEPH_CLIENT_MAP_STATE_UNKNOWN
+};
+
+static inline
+bool is_mon_and_osd_map_state_invalid(struct ceph_client *client)
+{
+ int have_mon_and_osd_map = atomic_read(&client->have_mon_and_osd_map);
+
+ return have_mon_and_osd_map < CEPH_CLIENT_HAS_NO_MON_AND_NO_OSD_MAP ||
+ have_mon_and_osd_map >= CEPH_CLIENT_MAP_STATE_UNKNOWN;
+}
+
#define from_msgr(ms) container_of(ms, struct ceph_client, msgr)
static inline bool ceph_msgr2(struct ceph_client *client)
@@ -723,6 +723,8 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private)
mutex_init(&client->mount_mutex);
init_waitqueue_head(&client->auth_wq);
+ atomic_set(&client->have_mon_and_osd_map,
+ CEPH_CLIENT_HAS_NO_MON_AND_NO_OSD_MAP);
client->auth_err = 0;
client->extra_mon_dispatch = NULL;
@@ -790,8 +792,8 @@ EXPORT_SYMBOL(ceph_reset_client_addr);
*/
static bool have_mon_and_osd_map(struct ceph_client *client)
{
- return client->monc.monmap && client->monc.monmap->epoch &&
- client->osdc.osdmap && client->osdc.osdmap->epoch;
+ return atomic_read(&client->have_mon_and_osd_map) ==
+ CEPH_CLIENT_HAS_MON_AND_OSD_MAP;
}
/*
@@ -36,8 +36,10 @@ static int monmap_show(struct seq_file *s, void *p)
int i;
struct ceph_client *client = s->private;
+ mutex_lock(&client->monc.mutex);
+
if (client->monc.monmap == NULL)
- return 0;
+ goto out_unlock;
seq_printf(s, "epoch %d\n", client->monc.monmap->epoch);
for (i = 0; i < client->monc.monmap->num_mon; i++) {
@@ -48,6 +50,10 @@ static int monmap_show(struct seq_file *s, void *p)
ENTITY_NAME(inst->name),
ceph_pr_addr(&inst->addr));
}
+
+out_unlock:
+ mutex_unlock(&client->monc.mutex);
+
return 0;
}
@@ -56,13 +62,15 @@ static int osdmap_show(struct seq_file *s, void *p)
int i;
struct ceph_client *client = s->private;
struct ceph_osd_client *osdc = &client->osdc;
- struct ceph_osdmap *map = osdc->osdmap;
+ struct ceph_osdmap *map = NULL;
struct rb_node *n;
+ down_read(&osdc->lock);
+
+ map = osdc->osdmap;
if (map == NULL)
- return 0;
+ goto out_unlock;
- down_read(&osdc->lock);
seq_printf(s, "epoch %u barrier %u flags 0x%x\n", map->epoch,
osdc->epoch_barrier, map->flags);
@@ -131,6 +139,7 @@ static int osdmap_show(struct seq_file *s, void *p)
seq_printf(s, "]\n");
}
+out_unlock:
up_read(&osdc->lock);
return 0;
}
@@ -562,12 +562,16 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
goto out;
}
+ atomic_dec(&client->have_mon_and_osd_map);
+
kfree(monc->monmap);
monc->monmap = monmap;
__ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
client->have_fsid = true;
+ atomic_inc(&client->have_mon_and_osd_map);
+
out:
mutex_unlock(&monc->mutex);
wake_up_all(&client->auth_wq);
@@ -1220,6 +1224,9 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
+ atomic_inc(&monc->client->have_mon_and_osd_map);
+ WARN_ON(is_mon_and_osd_map_state_invalid(monc->client));
+
return 0;
out_auth_reply:
@@ -1232,6 +1239,7 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
ceph_auth_destroy(monc->auth);
out_monmap:
kfree(monc->monmap);
+ monc->monmap = NULL;
out:
return err;
}
@@ -1239,6 +1247,8 @@ EXPORT_SYMBOL(ceph_monc_init);
void ceph_monc_stop(struct ceph_mon_client *monc)
{
+ struct ceph_monmap *old_monmap;
+
dout("stop\n");
mutex_lock(&monc->mutex);
@@ -1266,7 +1276,13 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
ceph_msg_put(monc->m_subscribe);
ceph_msg_put(monc->m_subscribe_ack);
- kfree(monc->monmap);
+ mutex_lock(&monc->mutex);
+ WARN_ON(is_mon_and_osd_map_state_invalid(monc->client));
+ atomic_dec(&monc->client->have_mon_and_osd_map);
+ old_monmap = monc->monmap;
+ monc->monmap = NULL;
+ mutex_unlock(&monc->mutex);
+ kfree(old_monmap);
}
EXPORT_SYMBOL(ceph_monc_stop);
@@ -4068,8 +4068,10 @@ static int handle_one_map(struct ceph_osd_client *osdc,
skipped_map = true;
}
+ atomic_dec(&osdc->client->have_mon_and_osd_map);
ceph_osdmap_destroy(osdc->osdmap);
osdc->osdmap = newmap;
+ atomic_inc(&osdc->client->have_mon_and_osd_map);
}
was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
@@ -5266,6 +5268,9 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
schedule_delayed_work(&osdc->osds_timeout_work,
round_jiffies_relative(osdc->client->options->osd_idle_ttl));
+ atomic_inc(&osdc->client->have_mon_and_osd_map);
+ WARN_ON(is_mon_and_osd_map_state_invalid(osdc->client));
+
return 0;
out_notify_wq:
@@ -5278,6 +5283,7 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
mempool_destroy(osdc->req_mempool);
out_map:
ceph_osdmap_destroy(osdc->osdmap);
+ osdc->osdmap = NULL;
out:
return err;
}
@@ -5306,10 +5312,15 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
WARN_ON(atomic_read(&osdc->num_requests));
WARN_ON(atomic_read(&osdc->num_homeless));
+ down_write(&osdc->lock);
+ WARN_ON(is_mon_and_osd_map_state_invalid(osdc->client));
+ atomic_dec(&osdc->client->have_mon_and_osd_map);
ceph_osdmap_destroy(osdc->osdmap);
+ osdc->osdmap = NULL;
mempool_destroy(osdc->req_mempool);
ceph_msgpool_destroy(&osdc->msgpool_op);
ceph_msgpool_destroy(&osdc->msgpool_op_reply);
+ up_write(&osdc->lock);
}
int osd_req_op_copy_from_init(struct ceph_osd_request *req,