diff mbox series

[RFC,31/48] dlm_ckv: introduce DLM cluster key-value storage

Message ID 20220803162857.27770-32-d.bogdanov@yadro.com
State New
Headers show
Series Target cluster implementation over DLM | expand

Commit Message

Dmitry Bogdanov Nov. 18, 2021, 7:52 a.m. UTC
Introduce the first version of DLM CKV module that could be used by
different kernel subsystems to share some information in a cluster.

This commit has just cluster level locks.

Signed-off-by: Dmitry Bogdanov <d.bogdanov@yadro.com>
---
 drivers/target/Kconfig   |   6 +
 drivers/target/Makefile  |   2 +
 drivers/target/dlm_ckv.c | 323 +++++++++++++++++++++++++++++++++++++++
 drivers/target/dlm_ckv.h |  19 +++
 4 files changed, 350 insertions(+)
 create mode 100644 drivers/target/dlm_ckv.c
 create mode 100644 drivers/target/dlm_ckv.h
diff mbox series

Patch

diff --git a/drivers/target/Kconfig b/drivers/target/Kconfig
index 72171ea3dd53..75d5e1d23a1c 100644
--- a/drivers/target/Kconfig
+++ b/drivers/target/Kconfig
@@ -35,6 +35,12 @@  config TCM_PSCSI
 	Say Y here to enable the TCM/pSCSI subsystem plugin for non-buffered
 	passthrough access to Linux/SCSI device
 
+config DLM_CKV
+	tristate "Cluster key value storage over DLM"
+	depends on DLM
+	help
+	Say Y here to enable the cluster key value storage over DLM
+
 config TCM_USER2
 	tristate "TCM/USER Subsystem Plugin for Linux"
 	depends on UIO && NET
diff --git a/drivers/target/Makefile b/drivers/target/Makefile
index 45634747377e..8bc9ac2bd629 100644
--- a/drivers/target/Makefile
+++ b/drivers/target/Makefile
@@ -30,3 +30,5 @@  obj-$(CONFIG_LOOPBACK_TARGET)	+= loopback/
 obj-$(CONFIG_TCM_FC)		+= tcm_fc/
 obj-$(CONFIG_ISCSI_TARGET)	+= iscsi/
 obj-$(CONFIG_SBP_TARGET)	+= sbp/
+
+obj-$(CONFIG_DLM_CKV)			+= dlm_ckv.o
diff --git a/drivers/target/dlm_ckv.c b/drivers/target/dlm_ckv.c
new file mode 100644
index 000000000000..a2e1a191c433
--- /dev/null
+++ b/drivers/target/dlm_ckv.c
@@ -0,0 +1,323 @@ 
+// SPDX-License-Identifier: GPL-2.0-or-later
+
+#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
+
+#include <asm-generic/errno-base.h>
+#include <linux/kthread.h>
+#include <linux/dlmconstants.h>
+#include <linux/mutex.h>
+#include <linux/dlm.h>
+#include <linux/module.h>
+#include <linux/kthread.h>
+#include <linux/sched.h>
+#include <target/target_core_base.h>
+#include "dlm_ckv.h"
+
+struct dlm_ckv_lksb {
+	struct dlm_lksb lksb;
+	struct completion compl;
+};
+
+struct dlm_ckv_lock {
+	struct dlm_ckv_bucket *bucket;
+	struct dlm_ckv_lksb lksb;
+	char name[DLM_RESNAME_MAXLEN];
+};
+
+struct dlm_ckv_bucket {
+	dlm_lockspace_t *ls;
+	struct kref refcount;
+	u32 local_nodeid;
+	u32 local_slotid;
+	size_t num_nodes;
+	int nodeid[64];
+	void *userarg;
+	struct completion sync_compl;
+};
+
+
+#define DLM_CKV_LVB_SIZE	256
+
+static void bucket_release(struct kref *ref);
+
+/* dlm calls before it does lock recovery */
+
+static void dlm_ckv_recover_prep(void *arg)
+{
+
+}
+
+/* dlm calls after recover_prep has been completed on all lockspace members;
+ * identifies slot/jid of failed member
+ */
+
+static void dlm_ckv_recover_slot(void *arg, struct dlm_slot *slot)
+{
+	pr_info("nodeid %d left the cluster\n", slot->nodeid);
+}
+
+/* dlm calls after recover_slot and after it completes lock recovery */
+
+static void dlm_ckv_recover_done(void *arg, struct dlm_slot *slots, int num_slots,
+			      int our_slot, uint32_t generation)
+{
+	struct dlm_ckv_bucket *bucket = arg;
+	int i;
+
+	for (i = 0; i < num_slots; i++) {
+		bucket->nodeid[i] = slots[i].nodeid;
+		if (slots[i].slot == our_slot)
+			bucket->local_nodeid = slots[i].nodeid;
+	}
+	bucket->local_slotid = our_slot;
+	bucket->num_nodes = num_slots;
+	complete(&bucket->sync_compl);
+}
+
+static const struct dlm_lockspace_ops dlm_ckv_lockspace_ops = {
+	.recover_prep = dlm_ckv_recover_prep,
+	.recover_slot = dlm_ckv_recover_slot,
+	.recover_done = dlm_ckv_recover_done,
+};
+
+static void dlm_ast(void *astarg)
+{
+	struct dlm_ckv_lksb *dlm_ckv_lksb = astarg;
+
+	complete(&dlm_ckv_lksb->compl);
+}
+
+/*
+ * dlm_ckv_cancel - Synchronously cancel a pending dlm_lock() operation
+ */
+static int dlm_ckv_cancel(dlm_lockspace_t *ls, struct dlm_ckv_lksb *lksb,
+			   int flags, const char *name)
+{
+	int res;
+
+	res = dlm_unlock(ls, lksb->lksb.sb_lkid,
+			      DLM_LKF_CANCEL | (flags & DLM_LKF_VALBLK),
+			      &lksb->lksb, lksb);
+	if (res < 0)
+		goto out;
+	res = wait_for_completion_timeout(&lksb->compl, 10 * HZ);
+
+out:
+	return res;
+}
+
+/**
+ * dlm_ckv_lock_wait - Wait until a DLM lock has been granted
+ * @ls:     DLM lock space.
+ * @mode:   DLM lock mode.
+ * @lksb:   DLM lock status block.
+ * @flags:  DLM flags.
+ * @name:   DLM lock name. Only required for non-conversion requests.
+ * @bast:   AST to be invoked in case this lock blocks another one.
+ */
+static int dlm_ckv_lock_wait(dlm_lockspace_t *ls, int mode,
+				struct dlm_ckv_lksb *lksb, int flags,
+				const char *name, void (*bast)(void *, int))
+{
+	int res;
+
+	res = dlm_lock(ls, mode, &lksb->lksb, flags,
+		       (void *)name, name ? strlen(name) : 0, 0,
+		       dlm_ast, lksb, bast);
+	if (res < 0)
+		goto out;
+	res = wait_for_completion_timeout(&lksb->compl, 60 * HZ);
+	if (res > 0)
+		res = lksb->lksb.sb_status;
+	else if (res == 0)
+		res = -ETIMEDOUT;
+	if (res < 0) {
+		int res2 = dlm_ckv_cancel(ls, lksb, flags, name);
+
+		if (res2 < 0)
+			pr_warn("canceling lock %s / %08x failed: %d\n",
+				name ? : "?", lksb->lksb.sb_lkid, res2);
+	}
+
+out:
+	return res;
+}
+
+/*
+ * dlm_ckv_unlock_wait - Release a DLM lock
+ */
+static int dlm_ckv_unlock_wait(dlm_lockspace_t *ls, struct dlm_ckv_lksb *lksb)
+{
+	int res;
+
+	res = dlm_unlock(ls, lksb->lksb.sb_lkid, 0, &lksb->lksb, lksb);
+	if (res < 0)
+		goto out;
+	res = wait_for_completion_timeout(&lksb->compl, 60 * HZ);
+	if (res > 0) {
+		res = lksb->lksb.sb_status;
+		if (res == -DLM_EUNLOCK || res == -DLM_ECANCEL)
+			res = 0;
+	} else if (res == 0) {
+		res = -ETIMEDOUT;
+	}
+
+out:
+	return res;
+}
+
+static void
+dlm_ckv_lock_init(struct dlm_ckv_lock *ckv_lock,
+		  struct dlm_ckv_bucket *bucket,
+		  const char *name)
+{
+	init_completion(&ckv_lock->lksb.compl);
+	strscpy(ckv_lock->name, name, DLM_RESNAME_MAXLEN);
+	ckv_lock->bucket = bucket;
+}
+
+struct dlm_ckv_lock *
+dlm_ckv_create_lock(struct dlm_ckv_bucket *bucket, const char *name)
+{
+	struct dlm_ckv_lock *ckv_lock;
+
+	ckv_lock = kzalloc(sizeof(struct dlm_ckv_lock), GFP_KERNEL);
+	if (!ckv_lock)
+		return NULL;
+
+	kref_get(&bucket->refcount);
+	dlm_ckv_lock_init(ckv_lock, bucket, name);
+
+	return ckv_lock;
+}
+EXPORT_SYMBOL(dlm_ckv_create_lock);
+
+void
+dlm_ckv_free_lock(struct dlm_ckv_lock *ckv_lock)
+{
+	struct dlm_ckv_bucket *bucket = ckv_lock->bucket;
+
+	kfree(ckv_lock);
+
+	kref_put(&bucket->refcount, bucket_release);
+}
+EXPORT_SYMBOL(dlm_ckv_free_lock);
+
+int
+dlm_ckv_lock_get(struct dlm_ckv_lock *ckv_lock)
+{
+	int res;
+
+	BUG_ON(!ckv_lock);
+
+	res = dlm_ckv_lock_wait(ckv_lock->bucket->ls, DLM_LOCK_EX,
+				&ckv_lock->lksb, 0, ckv_lock->name, NULL);
+
+	return res;
+}
+EXPORT_SYMBOL(dlm_ckv_lock_get);
+
+int
+dlm_ckv_lock_release(struct dlm_ckv_lock *ckv_lock)
+{
+	int res;
+
+	BUG_ON(!ckv_lock);
+
+	res = dlm_ckv_unlock_wait(ckv_lock->bucket->ls, &ckv_lock->lksb);
+
+	return res;
+}
+EXPORT_SYMBOL(dlm_ckv_lock_release);
+
+
+static void bucket_release(struct kref *ref)
+{
+	struct dlm_ckv_bucket *bucket = container_of(ref, struct dlm_ckv_bucket,
+						     refcount);
+	int res;
+
+	res = dlm_release_lockspace(bucket->ls, 2);
+	if (res)
+		pr_err("forcibly releasing lockspace failed: %d\n",
+		       res);
+
+	kfree(bucket);
+}
+
+struct dlm_ckv_bucket *
+dlm_ckv_open_bucket(const char *name, const char *cluster_name, void *userarg)
+{
+	struct dlm_ckv_bucket *bucket;
+	int name_len = strlen(name);
+	int ops_result;
+	int err;
+
+	if (!name)
+		return ERR_PTR(-EINVAL);
+
+	if (name_len > DLM_LOCKSPACE_LEN)
+		return ERR_PTR(-EINVAL);
+
+	bucket = kzalloc(sizeof(struct dlm_ckv_bucket), GFP_KERNEL);
+	kref_init(&bucket->refcount);
+
+	bucket->userarg = userarg;
+	init_completion(&bucket->sync_compl);
+
+	err = dlm_new_lockspace(name, cluster_name,
+				DLM_LSFL_FS | DLM_LSFL_NEWEXCL, DLM_CKV_LVB_SIZE,
+				&dlm_ckv_lockspace_ops, bucket, &ops_result,
+				&bucket->ls);
+	if (err) {
+		pr_err("dlm_new_lockspace error %d\n", err);
+		goto fail_free;
+	}
+
+	if (ops_result < 0) {
+		pr_err("dlm does not support ops callbacks\n");
+		err = -EOPNOTSUPP;
+		goto fail_free;
+	}
+
+	wait_for_completion_timeout(&bucket->sync_compl, 10 * HZ);
+	if (bucket->num_nodes == 0) {
+		pr_err("Cluster joining timed out\n");
+		goto fail_init;
+	}
+
+	return bucket;
+
+fail_init:
+	dlm_release_lockspace(bucket->ls, 2);
+fail_free:
+	kfree(bucket);
+
+	return NULL;
+}
+EXPORT_SYMBOL(dlm_ckv_open_bucket);
+
+int dlm_ckv_close_bucket(struct dlm_ckv_bucket *bucket)
+{
+	kref_put(&bucket->refcount, bucket_release);
+
+	return 0;
+}
+EXPORT_SYMBOL(dlm_ckv_close_bucket);
+
+static int __init dlm_ckv_module_init(void)
+{
+	return 0;
+}
+
+static void __exit dlm_ckv_module_exit(void)
+{
+
+}
+
+MODULE_DESCRIPTION("Cluster KV storage over DLM");
+MODULE_AUTHOR("Dmitry Bogdanov <d.bogdanov@yadro.com>");
+MODULE_LICENSE("GPL");
+
+module_init(dlm_ckv_module_init);
+module_exit(dlm_ckv_module_exit);
diff --git a/drivers/target/dlm_ckv.h b/drivers/target/dlm_ckv.h
new file mode 100644
index 000000000000..1a3f79e42bf6
--- /dev/null
+++ b/drivers/target/dlm_ckv.h
@@ -0,0 +1,19 @@ 
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef DLM_CKV_H
+#define DLM_CKV_H
+
+struct dlm_ckv_bucket;
+struct dlm_ckv_lock;
+
+struct dlm_ckv_bucket *dlm_ckv_open_bucket(const char *name,
+					   const char *cluster_name,
+					   void *userarg);
+int dlm_ckv_close_bucket(struct dlm_ckv_bucket *bucket);
+
+struct dlm_ckv_lock *
+dlm_ckv_create_lock(struct dlm_ckv_bucket *bucket, const char *name);
+void dlm_ckv_free_lock(struct dlm_ckv_lock *ckv_lock);
+int dlm_ckv_lock_get(struct dlm_ckv_lock *ckv_lock);
+int dlm_ckv_lock_release(struct dlm_ckv_lock *ckv_lock);
+
+#endif