diff mbox series

[API-NEXT,v1,2/3] linux-gen: queue: lock-free implementation

Message ID 1513782010-27654-3-git-send-email-odpbot@yandex.ru
State New
Headers show
Series [API-NEXT,v1,1/3] api: queue: block-free capabilities | expand

Commit Message

Github ODP bot Dec. 20, 2017, 3 p.m. UTC
From: Petri Savolainen <petri.savolainen@linaro.org>


Simple implementation of non-blocking, lock-free plain queues.

Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>

---
/** Email created from pull request 353 (psavol:next-lockfree-queue-impl2)
 ** https://github.com/Linaro/odp/pull/353
 ** Patch: https://github.com/Linaro/odp/pull/353.patch
 ** Base sha: 12fd3a9224a856271934986a1bad981843915d68
 ** Merge commit sha: 291f28b061ad4fa89a0346d6da4c6324c6335eca
 **/
 platform/linux-generic/Makefile.am                 |   2 +
 .../linux-generic/include/odp_queue_internal.h     |   1 +
 platform/linux-generic/include/odp_queue_lf.h      |  36 +++
 platform/linux-generic/odp_queue.c                 |  71 ++++-
 platform/linux-generic/odp_queue_lf.c              | 302 +++++++++++++++++++++
 5 files changed, 398 insertions(+), 14 deletions(-)
 create mode 100644 platform/linux-generic/include/odp_queue_lf.h
 create mode 100644 platform/linux-generic/odp_queue_lf.c
diff mbox series

Patch

diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am
index 4371e7a99..d4673136a 100644
--- a/platform/linux-generic/Makefile.am
+++ b/platform/linux-generic/Makefile.am
@@ -161,6 +161,7 @@  noinst_HEADERS = \
 		  include/odp_queue_scalable_internal.h \
 		  include/odp_ring_internal.h \
 		  include/odp_queue_if.h \
+		  include/odp_queue_lf.h \
 		  include/odp_schedule_if.h \
 		  include/odp_schedule_scalable.h \
 		  include/odp_schedule_scalable_config.h \
@@ -223,6 +224,7 @@  __LIB__libodp_linux_la_SOURCES = \
 			   odp_pool.c \
 			   odp_queue.c \
 			   odp_queue_if.c \
+			   odp_queue_lf.c \
 			   odp_queue_scalable.c \
 			   odp_rwlock.c \
 			   odp_rwlock_recursive.c \
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index dd846d592..26410ac7b 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -53,6 +53,7 @@  struct queue_entry_s {
 	odp_queue_param_t param;
 	odp_pktin_queue_t pktin;
 	odp_pktout_queue_t pktout;
+	void             *queue_lf;
 	char              name[ODP_QUEUE_NAME_LEN];
 };
 
diff --git a/platform/linux-generic/include/odp_queue_lf.h b/platform/linux-generic/include/odp_queue_lf.h
new file mode 100644
index 000000000..34ea48737
--- /dev/null
+++ b/platform/linux-generic/include/odp_queue_lf.h
@@ -0,0 +1,36 @@ 
+/* Copyright (c) 2017, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+#ifndef ODP_QUEUE_LF_H_
+#define ODP_QUEUE_LF_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <odp_queue_if.h>
+#include <odp_queue_internal.h>
+
+/* Lock-free queue functions */
+typedef struct {
+	queue_enq_fn_t enq;
+	queue_enq_multi_fn_t enq_multi;
+	queue_deq_fn_t deq;
+	queue_deq_multi_fn_t deq_multi;
+
+} queue_lf_func_t;
+
+uint32_t queue_lf_init_global(uint32_t *queue_lf_size,
+			      queue_lf_func_t *lf_func);
+void queue_lf_term_global(void);
+void *queue_lf_create(queue_entry_t *queue);
+void queue_lf_destroy(void *queue_lf);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index 6a69eb849..7cb60562b 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -8,6 +8,7 @@ 
 
 #include <odp/api/queue.h>
 #include <odp_queue_internal.h>
+#include <odp_queue_lf.h>
 #include <odp_queue_if.h>
 #include <odp/api/std_types.h>
 #include <odp/api/align.h>
@@ -39,11 +40,16 @@ 
 static int queue_init(queue_entry_t *queue, const char *name,
 		      const odp_queue_param_t *param);
 
-typedef struct queue_table_t {
-	queue_entry_t  queue[ODP_CONFIG_QUEUES];
-} queue_table_t;
+typedef struct queue_global_t {
+	queue_entry_t   queue[ODP_CONFIG_QUEUES];
 
-static queue_table_t *queue_tbl;
+	uint32_t        queue_lf_num;
+	uint32_t        queue_lf_size;
+	queue_lf_func_t queue_lf_func;
+
+} queue_global_t;
+
+static queue_global_t *queue_glb;
 
 static
 queue_entry_t *get_qentry(uint32_t queue_id);
@@ -64,26 +70,28 @@  static inline odp_queue_t queue_from_id(uint32_t queue_id)
 static
 queue_entry_t *get_qentry(uint32_t queue_id)
 {
-	return &queue_tbl->queue[queue_id];
+	return &queue_glb->queue[queue_id];
 }
 
 static int queue_init_global(void)
 {
 	uint32_t i;
 	odp_shm_t shm;
+	uint32_t lf_size = 0;
+	queue_lf_func_t *lf_func;
 
 	ODP_DBG("Queue init ... ");
 
 	shm = odp_shm_reserve("odp_queues",
-			      sizeof(queue_table_t),
+			      sizeof(queue_global_t),
 			      sizeof(queue_entry_t), 0);
 
-	queue_tbl = odp_shm_addr(shm);
+	queue_glb = odp_shm_addr(shm);
 
-	if (queue_tbl == NULL)
+	if (queue_glb == NULL)
 		return -1;
 
-	memset(queue_tbl, 0, sizeof(queue_table_t));
+	memset(queue_glb, 0, sizeof(queue_global_t));
 
 	for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
 		/* init locks */
@@ -93,6 +101,10 @@  static int queue_init_global(void)
 		queue->s.handle = queue_from_id(i);
 	}
 
+	lf_func = &queue_glb->queue_lf_func;
+	queue_glb->queue_lf_num  = queue_lf_init_global(&lf_size, lf_func);
+	queue_glb->queue_lf_size = lf_size;
+
 	ODP_DBG("done\n");
 	ODP_DBG("Queue init global\n");
 	ODP_DBG("  struct queue_entry_s size %zu\n",
@@ -122,7 +134,7 @@  static int queue_term_global(void)
 	int i;
 
 	for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
-		queue = &queue_tbl->queue[i];
+		queue = &queue_glb->queue[i];
 		LOCK(&queue->s.lock);
 		if (queue->s.status != QUEUE_STATUS_FREE) {
 			ODP_ERR("Not destroyed queue: %s\n", queue->s.name);
@@ -131,6 +143,8 @@  static int queue_term_global(void)
 		UNLOCK(&queue->s.lock);
 	}
 
+	queue_lf_term_global();
+
 	ret = odp_shm_free(odp_shm_lookup("odp_queues"));
 	if (ret < 0) {
 		ODP_ERR("shm free failed for odp_queues");
@@ -151,6 +165,8 @@  static int queue_capability(odp_queue_capability_t *capa)
 	capa->sched_prios       = odp_schedule_num_prio();
 	capa->plain.max_num     = capa->max_queues;
 	capa->sched.max_num     = capa->max_queues;
+	capa->plain.lockfree.max_num  = queue_glb->queue_lf_num;
+	capa->plain.lockfree.max_size = queue_glb->queue_lf_size;
 
 	return 0;
 }
@@ -188,6 +204,7 @@  static odp_queue_t queue_create(const char *name,
 {
 	uint32_t i;
 	queue_entry_t *queue;
+	void *queue_lf;
 	odp_queue_t handle = ODP_QUEUE_INVALID;
 	odp_queue_type_t type = ODP_QUEUE_TYPE_PLAIN;
 	odp_queue_param_t default_param;
@@ -198,7 +215,7 @@  static odp_queue_t queue_create(const char *name,
 	}
 
 	for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
-		queue = &queue_tbl->queue[i];
+		queue = &queue_glb->queue[i];
 
 		if (queue->s.status != QUEUE_STATUS_FREE)
 			continue;
@@ -207,7 +224,26 @@  static odp_queue_t queue_create(const char *name,
 		if (queue->s.status == QUEUE_STATUS_FREE) {
 			if (queue_init(queue, name, param)) {
 				UNLOCK(&queue->s.lock);
-				return handle;
+				return ODP_QUEUE_INVALID;
+			}
+
+			if (param->nonblocking == ODP_NONBLOCKING_LF) {
+				queue_lf_func_t *lf_func;
+
+				lf_func = &queue_glb->queue_lf_func;
+
+				queue_lf = queue_lf_create(queue);
+
+				if (queue_lf == NULL) {
+					UNLOCK(&queue->s.lock);
+					return ODP_QUEUE_INVALID;
+				}
+				queue->s.queue_lf = queue_lf;
+
+				queue->s.enqueue       = lf_func->enq;
+				queue->s.enqueue_multi = lf_func->enq_multi;
+				queue->s.dequeue       = lf_func->deq;
+				queue->s.dequeue_multi = lf_func->deq_multi;
 			}
 
 			type = queue->s.type;
@@ -224,7 +260,10 @@  static odp_queue_t queue_create(const char *name,
 		UNLOCK(&queue->s.lock);
 	}
 
-	if (handle != ODP_QUEUE_INVALID && type == ODP_QUEUE_TYPE_SCHED) {
+	if (handle == ODP_QUEUE_INVALID)
+		return ODP_QUEUE_INVALID;
+
+	if (type == ODP_QUEUE_TYPE_SCHED) {
 		if (sched_fn->init_queue(queue->s.index,
 					 &queue->s.param.sched)) {
 			queue->s.status = QUEUE_STATUS_FREE;
@@ -289,6 +328,10 @@  static int queue_destroy(odp_queue_t handle)
 	default:
 		ODP_ABORT("Unexpected queue status\n");
 	}
+
+	if (queue->s.param.nonblocking == ODP_NONBLOCKING_LF)
+		queue_lf_destroy(queue->s.queue_lf);
+
 	UNLOCK(&queue->s.lock);
 
 	return 0;
@@ -313,7 +356,7 @@  static odp_queue_t queue_lookup(const char *name)
 	uint32_t i;
 
 	for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
-		queue_entry_t *queue = &queue_tbl->queue[i];
+		queue_entry_t *queue = &queue_glb->queue[i];
 
 		if (queue->s.status == QUEUE_STATUS_FREE ||
 		    queue->s.status == QUEUE_STATUS_DESTROYED)
diff --git a/platform/linux-generic/odp_queue_lf.c b/platform/linux-generic/odp_queue_lf.c
new file mode 100644
index 000000000..756422cf3
--- /dev/null
+++ b/platform/linux-generic/odp_queue_lf.c
@@ -0,0 +1,302 @@ 
+/* Copyright (c) 2017, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+#include <odp/api/queue.h>
+#include <odp/api/atomic.h>
+#include <odp/api/shared_memory.h>
+#include <odp_queue_lf.h>
+#include <string.h>
+#include <stdio.h>
+
+#include "config.h"
+#include <odp_debug_internal.h>
+
+#define LF_NODE_EMPTY   0
+#define LF_NODE_DATA    1
+#define LF_NODE_MARK    2
+#define RING_LF_SIZE   64
+#define QUEUE_LF_NUM  128
+
+/* Node in lock-free ring */
+typedef union {
+	odp_atomic_u64_t atomic_u64;
+
+	struct {
+		/* marker == DATA */
+		uint64_t marker:   2;
+
+		/* Max 62 bits stored from a pointer */
+		uint64_t ptr:     62;
+	} data;
+
+	struct {
+		/* marker == EMPTY or MARK */
+		uint64_t marker:   2;
+
+		/* Counter to avoid ABA issues */
+		uint64_t counter: 62;
+	};
+
+	uint64_t u64;
+
+} ring_lf_node_t;
+
+/* Lock-free ring */
+typedef struct {
+	ring_lf_node_t node[RING_LF_SIZE];
+
+	int used;
+	odp_atomic_u64_t aba_counter;
+
+} queue_lf_t ODP_ALIGNED_CACHE;
+
+/* Lock-free queue globals */
+typedef struct {
+	queue_lf_t queue_lf[QUEUE_LF_NUM];
+
+	odp_shm_t shm;
+
+} queue_lf_global_t ODP_ALIGNED_CACHE;
+
+static queue_lf_global_t *queue_lf_glb;
+
+static inline int next_idx(int idx)
+{
+	int next = idx + 1;
+
+	if (next == RING_LF_SIZE)
+		next = 0;
+
+	return next;
+}
+
+static inline int next_is_tail(ring_lf_node_t node, ring_lf_node_t next,
+			       ring_lf_node_t next_next)
+{
+	if ((node.marker == LF_NODE_MARK || node.marker == LF_NODE_DATA) &&
+	    next.marker == LF_NODE_EMPTY)
+		return 1;
+
+	if (node.marker == LF_NODE_DATA && next.marker == LF_NODE_MARK &&
+	    next_next.marker == LF_NODE_MARK)
+		return 1;
+
+	return 0;
+}
+
+static inline int next_is_head(ring_lf_node_t node, ring_lf_node_t next)
+{
+	if ((node.marker == LF_NODE_MARK && next.marker == LF_NODE_DATA))
+		return 1;
+
+	return 0;
+}
+
+static int queue_lf_enq(queue_t q_int, odp_buffer_hdr_t *buf_hdr)
+{
+	queue_entry_t *queue;
+	queue_lf_t *queue_lf;
+	int i, i_next, i_nn;
+	ring_lf_node_t node_val;
+	ring_lf_node_t next_val;
+	ring_lf_node_t nn_val;
+	ring_lf_node_t new_val;
+	ring_lf_node_t *node;
+	ring_lf_node_t *next;
+	ring_lf_node_t *nn;
+	uint64_t cur_val;
+
+	queue    = qentry_from_int(q_int);
+	queue_lf = queue->s.queue_lf;
+
+	for (i = 0; i < RING_LF_SIZE; i++) {
+		i_next = next_idx(i);
+		i_nn   = next_idx(i_next);
+
+		node = &queue_lf->node[i];
+		next = &queue_lf->node[i_next];
+		nn   = &queue_lf->node[i_nn];
+		node_val.u64 = odp_atomic_load_u64(&node->atomic_u64);
+		next_val.u64 = odp_atomic_load_u64(&next->atomic_u64);
+		nn_val.u64   = odp_atomic_load_u64(&nn->atomic_u64);
+
+		if (!next_is_tail(node_val, next_val, nn_val))
+			continue;
+
+		/* Next node is the tail. Replace it with data. */
+		new_val.data.marker = LF_NODE_DATA;
+		new_val.data.ptr    = ((uintptr_t)buf_hdr) >> 2;
+
+		cur_val = next_val.u64;
+
+		if (odp_atomic_cas_rel_u64(&next->atomic_u64, &cur_val,
+					   new_val.u64))
+			return 0;
+	}
+
+	return -1;
+}
+
+static int queue_lf_enq_multi(queue_t q_int, odp_buffer_hdr_t **buf_hdr,
+			      int num)
+{
+	(void)num;
+
+	if (queue_lf_enq(q_int, buf_hdr[0]) == 0)
+		return 1;
+
+	return 0;
+}
+
+static odp_buffer_hdr_t *queue_lf_deq(queue_t q_int)
+{
+	queue_entry_t *queue;
+	queue_lf_t *queue_lf;
+	int i, i_next, i_nn;
+	ring_lf_node_t node_val;
+	ring_lf_node_t next_val;
+	ring_lf_node_t nn_val;
+	ring_lf_node_t new_val;
+	ring_lf_node_t *node;
+	ring_lf_node_t *next;
+	ring_lf_node_t *nn;
+	uint64_t counter;
+
+	queue    = qentry_from_int(q_int);
+	queue_lf = queue->s.queue_lf;
+
+	for (i = 0; i < RING_LF_SIZE; i++) {
+retry:
+		i_next = next_idx(i);
+
+		node = &queue_lf->node[i];
+		next = &queue_lf->node[i_next];
+		node_val.u64 = odp_atomic_load_u64(&node->atomic_u64);
+		next_val.u64 = odp_atomic_load_u64(&next->atomic_u64);
+
+		if (!next_is_head(node_val, next_val))
+			continue;
+
+		/* Next node is the head.*/
+		i_nn       = next_idx(i_next);
+		nn         = &queue_lf->node[i_nn];
+		nn_val.u64 = odp_atomic_load_u64(&nn->atomic_u64);
+
+		counter = odp_atomic_fetch_inc_u64(&queue_lf->aba_counter);
+		new_val.counter = counter;
+
+		/* Cannot replace next with marker, as long as next-next is
+		 * also a marker. Otherwise, the ring would be full of markers.
+		 * Empty it and retry. */
+		if (nn_val.marker == LF_NODE_MARK) {
+			new_val.marker = LF_NODE_EMPTY;
+			odp_atomic_cas_u64(&nn->atomic_u64, &nn_val.u64,
+					   new_val.u64);
+			goto retry;
+		}
+
+		new_val.marker = LF_NODE_MARK;
+
+		if (odp_atomic_cas_acq_u64(&next->atomic_u64, &next_val.u64,
+					   new_val.u64)) {
+			/* Successfully replaced data with marker. */
+			return (void *)(uintptr_t)(next_val.data.ptr << 2);
+		}
+	}
+
+	return NULL;
+}
+
+static int queue_lf_deq_multi(queue_t q_int, odp_buffer_hdr_t **buf_hdr,
+			      int num)
+{
+	odp_buffer_hdr_t *buf;
+
+	(void)num;
+
+	buf = queue_lf_deq(q_int);
+
+	if (buf == NULL)
+		return 0;
+
+	buf_hdr[0] = buf;
+	return 1;
+}
+
+uint32_t queue_lf_init_global(uint32_t *queue_lf_size,
+			      queue_lf_func_t *lf_func)
+{
+	odp_shm_t shm;
+	odp_atomic_op_t lockfree;
+
+	odp_atomic_lock_free_u64(&lockfree);
+
+	ODP_DBG("\nLock-free queue init\n");
+	ODP_DBG("  u64 lock-free: %i\n\n", lockfree.op.cas);
+
+	if (lockfree.op.cas == 0)
+		return 0;
+
+	shm = odp_shm_reserve("odp_queues_lf",
+			      sizeof(queue_lf_global_t),
+			      ODP_CACHE_LINE_SIZE, 0);
+
+	queue_lf_glb = odp_shm_addr(shm);
+	memset(queue_lf_glb, 0, sizeof(queue_lf_global_t));
+
+	queue_lf_glb->shm = shm;
+
+	memset(lf_func, 0, sizeof(queue_lf_func_t));
+	lf_func->enq       = queue_lf_enq;
+	lf_func->enq_multi = queue_lf_enq_multi;
+	lf_func->deq       = queue_lf_deq;
+	lf_func->deq_multi = queue_lf_deq_multi;
+
+	*queue_lf_size = RING_LF_SIZE - 1;
+
+	return QUEUE_LF_NUM;
+}
+
+void queue_lf_term_global(void)
+{
+	odp_shm_t shm;
+
+	if (queue_lf_glb == NULL)
+		return;
+
+	shm = queue_lf_glb->shm;
+
+	if (odp_shm_free(shm) < 0)
+		ODP_ERR("shm free failed");
+}
+
+void *queue_lf_create(queue_entry_t *queue)
+{
+	int i;
+	queue_lf_t *queue_lf = NULL;
+
+	if (queue->s.type != ODP_QUEUE_TYPE_PLAIN)
+		return NULL;
+
+	for (i = 0; i < QUEUE_LF_NUM; i++) {
+		if (queue_lf_glb->queue_lf[i].used == 0) {
+			queue_lf = &queue_lf_glb->queue_lf[i];
+			memset(queue_lf, 0, sizeof(queue_lf_t));
+			queue_lf->node[0].marker = LF_NODE_MARK;
+			queue_lf->used = 1;
+			break;
+		}
+	}
+
+	return queue_lf;
+}
+
+void queue_lf_destroy(void *queue_lf_ptr)
+{
+	queue_lf_t *queue_lf = queue_lf_ptr;
+
+	queue_lf->used = 0;
+}