@@ -160,6 +160,7 @@ noinst_HEADERS = \
include/odp_queue_scalable_internal.h \
include/odp_ring_internal.h \
include/odp_queue_if.h \
+ include/odp_queue_lf.h \
include/odp_schedule_if.h \
include/odp_schedule_scalable.h \
include/odp_schedule_scalable_config.h \
@@ -220,6 +221,7 @@ __LIB__libodp_linux_la_SOURCES = \
odp_pool.c \
odp_queue.c \
odp_queue_if.c \
+ odp_queue_lf.c \
odp_queue_scalable.c \
odp_rwlock.c \
odp_rwlock_recursive.c \
@@ -53,6 +53,7 @@ struct queue_entry_s {
odp_queue_param_t param;
odp_pktin_queue_t pktin;
odp_pktout_queue_t pktout;
+ void *queue_lf;
char name[ODP_QUEUE_NAME_LEN];
};
new file mode 100644
@@ -0,0 +1,36 @@
+/* Copyright (c) 2018, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#ifndef ODP_QUEUE_LF_H_
+#define ODP_QUEUE_LF_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <odp_queue_if.h>
+#include <odp_queue_internal.h>
+
+/* Lock-free queue functions */
+typedef struct {
+ queue_enq_fn_t enq;
+ queue_enq_multi_fn_t enq_multi;
+ queue_deq_fn_t deq;
+ queue_deq_multi_fn_t deq_multi;
+
+} queue_lf_func_t;
+
+uint32_t queue_lf_init_global(uint32_t *queue_lf_size,
+ queue_lf_func_t *lf_func);
+void queue_lf_term_global(void);
+void *queue_lf_create(queue_entry_t *queue);
+void queue_lf_destroy(void *queue_lf);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
@@ -8,6 +8,7 @@
#include <odp/api/queue.h>
#include <odp_queue_internal.h>
+#include <odp_queue_lf.h>
#include <odp_queue_if.h>
#include <odp/api/std_types.h>
#include <odp/api/align.h>
@@ -39,11 +40,16 @@
static int queue_init(queue_entry_t *queue, const char *name,
const odp_queue_param_t *param);
-typedef struct queue_table_t {
- queue_entry_t queue[ODP_CONFIG_QUEUES];
-} queue_table_t;
+typedef struct queue_global_t {
+ queue_entry_t queue[ODP_CONFIG_QUEUES];
-static queue_table_t *queue_tbl;
+ uint32_t queue_lf_num;
+ uint32_t queue_lf_size;
+ queue_lf_func_t queue_lf_func;
+
+} queue_global_t;
+
+static queue_global_t *queue_glb;
static
queue_entry_t *get_qentry(uint32_t queue_id);
@@ -64,26 +70,28 @@ static inline odp_queue_t queue_from_id(uint32_t queue_id)
static
queue_entry_t *get_qentry(uint32_t queue_id)
{
- return &queue_tbl->queue[queue_id];
+ return &queue_glb->queue[queue_id];
}
static int queue_init_global(void)
{
uint32_t i;
odp_shm_t shm;
+ uint32_t lf_size = 0;
+ queue_lf_func_t *lf_func;
ODP_DBG("Queue init ... ");
shm = odp_shm_reserve("odp_queues",
- sizeof(queue_table_t),
+ sizeof(queue_global_t),
sizeof(queue_entry_t), 0);
- queue_tbl = odp_shm_addr(shm);
+ queue_glb = odp_shm_addr(shm);
- if (queue_tbl == NULL)
+ if (queue_glb == NULL)
return -1;
- memset(queue_tbl, 0, sizeof(queue_table_t));
+ memset(queue_glb, 0, sizeof(queue_global_t));
for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
/* init locks */
@@ -93,6 +101,10 @@ static int queue_init_global(void)
queue->s.handle = queue_from_id(i);
}
+ lf_func = &queue_glb->queue_lf_func;
+ queue_glb->queue_lf_num = queue_lf_init_global(&lf_size, lf_func);
+ queue_glb->queue_lf_size = lf_size;
+
ODP_DBG("done\n");
ODP_DBG("Queue init global\n");
ODP_DBG(" struct queue_entry_s size %zu\n",
@@ -122,7 +134,7 @@ static int queue_term_global(void)
int i;
for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
- queue = &queue_tbl->queue[i];
+ queue = &queue_glb->queue[i];
LOCK(&queue->s.lock);
if (queue->s.status != QUEUE_STATUS_FREE) {
ODP_ERR("Not destroyed queue: %s\n", queue->s.name);
@@ -131,6 +143,8 @@ static int queue_term_global(void)
UNLOCK(&queue->s.lock);
}
+ queue_lf_term_global();
+
ret = odp_shm_free(odp_shm_lookup("odp_queues"));
if (ret < 0) {
ODP_ERR("shm free failed for odp_queues");
@@ -151,6 +165,8 @@ static int queue_capability(odp_queue_capability_t *capa)
capa->sched_prios = odp_schedule_num_prio();
capa->plain.max_num = capa->max_queues;
capa->sched.max_num = capa->max_queues;
+ capa->plain.lockfree.max_num = queue_glb->queue_lf_num;
+ capa->plain.lockfree.max_size = queue_glb->queue_lf_size;
return 0;
}
@@ -188,6 +204,7 @@ static odp_queue_t queue_create(const char *name,
{
uint32_t i;
queue_entry_t *queue;
+ void *queue_lf;
odp_queue_t handle = ODP_QUEUE_INVALID;
odp_queue_type_t type = ODP_QUEUE_TYPE_PLAIN;
odp_queue_param_t default_param;
@@ -198,7 +215,7 @@ static odp_queue_t queue_create(const char *name,
}
for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
- queue = &queue_tbl->queue[i];
+ queue = &queue_glb->queue[i];
if (queue->s.status != QUEUE_STATUS_FREE)
continue;
@@ -207,7 +224,26 @@ static odp_queue_t queue_create(const char *name,
if (queue->s.status == QUEUE_STATUS_FREE) {
if (queue_init(queue, name, param)) {
UNLOCK(&queue->s.lock);
- return handle;
+ return ODP_QUEUE_INVALID;
+ }
+
+ if (param->nonblocking == ODP_NONBLOCKING_LF) {
+ queue_lf_func_t *lf_func;
+
+ lf_func = &queue_glb->queue_lf_func;
+
+ queue_lf = queue_lf_create(queue);
+
+ if (queue_lf == NULL) {
+ UNLOCK(&queue->s.lock);
+ return ODP_QUEUE_INVALID;
+ }
+ queue->s.queue_lf = queue_lf;
+
+ queue->s.enqueue = lf_func->enq;
+ queue->s.enqueue_multi = lf_func->enq_multi;
+ queue->s.dequeue = lf_func->deq;
+ queue->s.dequeue_multi = lf_func->deq_multi;
}
type = queue->s.type;
@@ -224,7 +260,10 @@ static odp_queue_t queue_create(const char *name,
UNLOCK(&queue->s.lock);
}
- if (handle != ODP_QUEUE_INVALID && type == ODP_QUEUE_TYPE_SCHED) {
+ if (handle == ODP_QUEUE_INVALID)
+ return ODP_QUEUE_INVALID;
+
+ if (type == ODP_QUEUE_TYPE_SCHED) {
if (sched_fn->init_queue(queue->s.index,
&queue->s.param.sched)) {
queue->s.status = QUEUE_STATUS_FREE;
@@ -289,6 +328,10 @@ static int queue_destroy(odp_queue_t handle)
default:
ODP_ABORT("Unexpected queue status\n");
}
+
+ if (queue->s.param.nonblocking == ODP_NONBLOCKING_LF)
+ queue_lf_destroy(queue->s.queue_lf);
+
UNLOCK(&queue->s.lock);
return 0;
@@ -313,7 +356,7 @@ static odp_queue_t queue_lookup(const char *name)
uint32_t i;
for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
- queue_entry_t *queue = &queue_tbl->queue[i];
+ queue_entry_t *queue = &queue_glb->queue[i];
if (queue->s.status == QUEUE_STATUS_FREE ||
queue->s.status == QUEUE_STATUS_DESTROYED)
new file mode 100644
@@ -0,0 +1,346 @@
+/* Copyright (c) 2018, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#include <odp/api/queue.h>
+#include <odp/api/atomic.h>
+#include <odp/api/shared_memory.h>
+#include <odp_queue_lf.h>
+#include <string.h>
+#include <stdio.h>
+
+#include "config.h"
+#include <odp_debug_internal.h>
+
+#define RING_LF_SIZE 32
+#define QUEUE_LF_NUM 128
+#define ENQ_RETRIES (RING_LF_SIZE / 4)
+#define DEQ_RETRIES (RING_LF_SIZE / 8)
+
+#ifdef __SIZEOF_INT128__
+
+typedef unsigned __int128 u128_t;
+
+static inline u128_t atomic_load_u128(u128_t *atomic)
+{
+ return __atomic_load_n(atomic, __ATOMIC_RELAXED);
+}
+
+static inline void atomic_zero_u128(u128_t *atomic)
+{
+ __atomic_store_n(atomic, 0, __ATOMIC_RELAXED);
+}
+
+static inline int atomic_cas_rel_u128(u128_t *atomic, u128_t old_val,
+ u128_t new_val)
+{
+ return __atomic_compare_exchange_n(atomic, &old_val, new_val,
+ 0 /* strong */,
+ __ATOMIC_RELEASE,
+ __ATOMIC_RELAXED);
+}
+
+static inline int atomic_cas_acq_u128(u128_t *atomic, u128_t old_val,
+ u128_t new_val)
+{
+ return __atomic_compare_exchange_n(atomic, &old_val, new_val,
+ 0 /* strong */,
+ __ATOMIC_ACQUIRE,
+ __ATOMIC_RELAXED);
+}
+
+#else
+
+/* These definitions enable build in non 128 bit compatible systems.
+ * Implementation is active only when 128 bit lockfree atomics are available.
+ * So, these are never actually used. */
+typedef struct {
+ uint64_t u64[2];
+} u128_t ODP_ALIGNED(16);
+
+static inline u128_t atomic_load_u128(u128_t *atomic)
+{
+ return *atomic;
+}
+
+static inline void atomic_zero_u128(u128_t *atomic)
+{
+ atomic->u64[0] = 0;
+ atomic->u64[1] = 0;
+}
+
+static inline int atomic_cas_rel_u128(u128_t *atomic, u128_t old_val,
+ u128_t new_val)
+{
+ (void)old_val;
+ *atomic = new_val;
+ return 1;
+}
+
+static inline int atomic_cas_acq_u128(u128_t *atomic, u128_t old_val,
+ u128_t new_val)
+{
+ return atomic_cas_rel_u128(atomic, old_val, new_val);
+}
+
+#endif
+
+/* Node in lock-free ring */
+typedef union {
+ u128_t u128;
+
+ struct {
+ /* 0: empty, 1: data */
+ uint64_t mark: 1;
+
+ /* A cache aligned pointer fits into 63 bits, since the least
+ * significant bits are zero. */
+ uint64_t ptr: 63;
+
+ /* Data with lowest counter value is the head. */
+ uint64_t count;
+ } s;
+
+} ring_lf_node_t;
+
+/* Lock-free ring */
+typedef struct {
+ ring_lf_node_t node[RING_LF_SIZE];
+ int used;
+ odp_atomic_u64_t enq_counter;
+
+} queue_lf_t ODP_ALIGNED_CACHE;
+
+/* Lock-free queue globals */
+typedef struct {
+ queue_lf_t queue_lf[QUEUE_LF_NUM];
+ odp_shm_t shm;
+
+} queue_lf_global_t ODP_ALIGNED_CACHE;
+
+static queue_lf_global_t *queue_lf_glb;
+
+static inline int next_idx(int idx)
+{
+ int next = idx + 1;
+
+ if (next == RING_LF_SIZE)
+ next = 0;
+
+ return next;
+}
+
+static int queue_lf_enq(queue_t q_int, odp_buffer_hdr_t *buf_hdr)
+{
+ queue_entry_t *queue;
+ queue_lf_t *queue_lf;
+ int i, j, i_node;
+ int found;
+ ring_lf_node_t node_val;
+ ring_lf_node_t new_val;
+ ring_lf_node_t *node;
+ uint64_t counter;
+
+ queue = qentry_from_int(q_int);
+ queue_lf = queue->s.queue_lf;
+
+ i_node = 0;
+
+ counter = odp_atomic_fetch_inc_u64(&queue_lf->enq_counter);
+
+ for (j = 0; j < ENQ_RETRIES; j++) {
+ found = 0;
+
+ /* Find empty node */
+ for (i = 0; i < RING_LF_SIZE; i++) {
+ i_node = next_idx(i_node);
+ node = &queue_lf->node[i_node];
+ node_val.u128 = atomic_load_u128(&node->u128);
+
+ if (node_val.s.mark == 0) {
+ found = 1;
+ break;
+ }
+ }
+
+ /* Queue is full */
+ if (found == 0)
+ return -1;
+
+ /* Try to insert data */
+ new_val.s.mark = 1;
+ new_val.s.count = counter;
+ new_val.s.ptr = ((uintptr_t)buf_hdr) >> 1;
+
+ if (atomic_cas_rel_u128(&node->u128, node_val.u128,
+ new_val.u128))
+ return 0;
+ }
+
+ return -1;
+}
+
+static int queue_lf_enq_multi(queue_t q_int, odp_buffer_hdr_t **buf_hdr,
+ int num)
+{
+ (void)num;
+
+ if (queue_lf_enq(q_int, buf_hdr[0]) == 0)
+ return 1;
+
+ return 0;
+}
+
+static odp_buffer_hdr_t *queue_lf_deq(queue_t q_int)
+{
+ queue_entry_t *queue;
+ queue_lf_t *queue_lf;
+ int i, j, i_node;
+ int found;
+ ring_lf_node_t node_val, old_val, new_val;
+ ring_lf_node_t *node, *old;
+ uint64_t lowest;
+
+ queue = qentry_from_int(q_int);
+ queue_lf = queue->s.queue_lf;
+ i_node = 0;
+
+ for (j = 0; j < DEQ_RETRIES; j++) {
+ found = 0;
+ lowest = -1;
+
+ /* Find the head node. The one with data and
+ * the lowest counter. */
+ for (i = 0; i < RING_LF_SIZE; i++) {
+ i_node = next_idx(i_node);
+ node = &queue_lf->node[i_node];
+ node_val.u128 = atomic_load_u128(&node->u128);
+
+ if (node_val.s.mark == 1 && node_val.s.count < lowest) {
+ old = node;
+ old_val.u128 = node_val.u128;
+ lowest = node_val.s.count;
+ found = 1;
+ }
+ }
+
+ /* Queue is empty */
+ if (found == 0)
+ return NULL;
+
+ /* Try to remove data */
+ new_val.u128 = old_val.u128;
+ new_val.s.mark = 0;
+
+ if (atomic_cas_acq_u128(&old->u128, old_val.u128,
+ new_val.u128))
+ return (void *)(((uintptr_t)old_val.s.ptr) << 1);
+ }
+
+ return NULL;
+}
+
+static int queue_lf_deq_multi(queue_t q_int, odp_buffer_hdr_t **buf_hdr,
+ int num)
+{
+ odp_buffer_hdr_t *buf;
+
+ (void)num;
+
+ buf = queue_lf_deq(q_int);
+
+ if (buf == NULL)
+ return 0;
+
+ buf_hdr[0] = buf;
+ return 1;
+}
+
+uint32_t queue_lf_init_global(uint32_t *queue_lf_size,
+ queue_lf_func_t *lf_func)
+{
+ odp_shm_t shm;
+ bool lockfree = 0;
+
+ /* 16 byte lockfree CAS operation is needed. */
+#ifdef __SIZEOF_INT128__
+ lockfree = __atomic_is_lock_free(16, NULL);
+#endif
+
+ ODP_DBG("\nLock-free queue init\n");
+ ODP_DBG(" u128 lock-free: %i\n\n", lockfree);
+
+ if (!lockfree)
+ return 0;
+
+ shm = odp_shm_reserve("odp_queues_lf", sizeof(queue_lf_global_t),
+ ODP_CACHE_LINE_SIZE, 0);
+
+ queue_lf_glb = odp_shm_addr(shm);
+ memset(queue_lf_glb, 0, sizeof(queue_lf_global_t));
+
+ queue_lf_glb->shm = shm;
+
+ memset(lf_func, 0, sizeof(queue_lf_func_t));
+ lf_func->enq = queue_lf_enq;
+ lf_func->enq_multi = queue_lf_enq_multi;
+ lf_func->deq = queue_lf_deq;
+ lf_func->deq_multi = queue_lf_deq_multi;
+
+ *queue_lf_size = RING_LF_SIZE;
+
+ return QUEUE_LF_NUM;
+}
+
+void queue_lf_term_global(void)
+{
+ odp_shm_t shm;
+
+ if (queue_lf_glb == NULL)
+ return;
+
+ shm = queue_lf_glb->shm;
+
+ if (odp_shm_free(shm) < 0)
+ ODP_ERR("shm free failed");
+}
+
+static void init_queue(queue_lf_t *queue_lf)
+{
+ int i;
+
+ odp_atomic_init_u64(&queue_lf->enq_counter, 0);
+
+ for (i = 0; i < RING_LF_SIZE; i++)
+ atomic_zero_u128(&queue_lf->node[i].u128);
+}
+
+void *queue_lf_create(queue_entry_t *queue)
+{
+ int i;
+ queue_lf_t *queue_lf = NULL;
+
+ if (queue->s.type != ODP_QUEUE_TYPE_PLAIN)
+ return NULL;
+
+ for (i = 0; i < QUEUE_LF_NUM; i++) {
+ if (queue_lf_glb->queue_lf[i].used == 0) {
+ queue_lf = &queue_lf_glb->queue_lf[i];
+ memset(queue_lf, 0, sizeof(queue_lf_t));
+ init_queue(queue_lf);
+ queue_lf->used = 1;
+ break;
+ }
+ }
+
+ return queue_lf;
+}
+
+void queue_lf_destroy(void *queue_lf_ptr)
+{
+ queue_lf_t *queue_lf = queue_lf_ptr;
+
+ queue_lf->used = 0;
+}