@@ -485,8 +485,8 @@ static inline void buffer_index_to_buf(odp_buffer_hdr_t *buf_hdr[],
}
}
-static inline int enq_multi(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr[],
- int num)
+static inline int _sched_queue_enq_multi(odp_queue_t handle,
+ odp_buffer_hdr_t *buf_hdr[], int num)
{
int sched = 0;
int ret;
@@ -532,17 +532,17 @@ static inline int enq_multi(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr[],
return num_enq;
}
-static int queue_int_enq_multi(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr[],
- int num)
+static int plain_queue_enq_multi(odp_queue_t handle,
+ odp_buffer_hdr_t *buf_hdr[], int num)
{
- return enq_multi(handle, buf_hdr, num);
+ return _sched_queue_enq_multi(handle, buf_hdr, num);
}
-static int queue_int_enq(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr)
+static int plain_queue_enq(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr)
{
int ret;
- ret = enq_multi(handle, &buf_hdr, 1);
+ ret = _sched_queue_enq_multi(handle, &buf_hdr, 1);
if (ret == 1)
return 0;
@@ -550,30 +550,8 @@ static int queue_int_enq(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr)
return -1;
}
-static int queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num)
-{
- queue_entry_t *queue = qentry_from_handle(handle);
-
- if (odp_unlikely(num == 0))
- return 0;
-
- if (num > QUEUE_MULTI_MAX)
- num = QUEUE_MULTI_MAX;
-
- return queue->s.enqueue_multi(handle,
- (odp_buffer_hdr_t **)(uintptr_t)ev, num);
-}
-
-static int queue_enq(odp_queue_t handle, odp_event_t ev)
-{
- queue_entry_t *queue = qentry_from_handle(handle);
-
- return queue->s.enqueue(handle,
- (odp_buffer_hdr_t *)(uintptr_t)ev);
-}
-
-static inline int plain_queue_deq(odp_queue_t handle,
- odp_buffer_hdr_t *buf_hdr[], int num)
+static inline int _plain_queue_deq_multi(odp_queue_t handle,
+ odp_buffer_hdr_t *buf_hdr[], int num)
{
int num_deq;
queue_entry_t *queue;
@@ -603,18 +581,18 @@ static inline int plain_queue_deq(odp_queue_t handle,
return num_deq;
}
-static int queue_int_deq_multi(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr[],
- int num)
+static int plain_queue_deq_multi(odp_queue_t handle,
+ odp_buffer_hdr_t *buf_hdr[], int num)
{
- return plain_queue_deq(handle, buf_hdr, num);
+ return _plain_queue_deq_multi(handle, buf_hdr, num);
}
-static odp_buffer_hdr_t *queue_int_deq(odp_queue_t handle)
+static odp_buffer_hdr_t *plain_queue_deq(odp_queue_t handle)
{
odp_buffer_hdr_t *buf_hdr = NULL;
int ret;
- ret = plain_queue_deq(handle, &buf_hdr, 1);
+ ret = _plain_queue_deq_multi(handle, &buf_hdr, 1);
if (ret == 1)
return buf_hdr;
@@ -622,89 +600,46 @@ static odp_buffer_hdr_t *queue_int_deq(odp_queue_t handle)
return NULL;
}
-static int queue_deq_multi(odp_queue_t handle, odp_event_t ev[], int num)
+static int error_enqueue(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr)
{
- queue_entry_t *queue = qentry_from_handle(handle);
+ (void)buf_hdr;
- if (num > QUEUE_MULTI_MAX)
- num = QUEUE_MULTI_MAX;
+ ODP_ERR("Enqueue not supported (" PRIu64 ")\n",
+ odp_queue_to_u64(handle));
- return queue->s.dequeue_multi(handle,
- (odp_buffer_hdr_t **)ev, num);
+ return -1;
}
-static odp_event_t queue_deq(odp_queue_t handle)
+static int error_enqueue_multi(odp_queue_t handle,
+ odp_buffer_hdr_t *buf_hdr[], int num)
{
- queue_entry_t *queue = qentry_from_handle(handle);
+ (void)buf_hdr;
+ (void)num;
- return (odp_event_t)queue->s.dequeue(handle);
+ ODP_ERR("Enqueue multi not supported (" PRIu64 ")\n",
+ odp_queue_to_u64(handle));
+
+ return -1;
}
-static int queue_init(queue_entry_t *queue, const char *name,
- const odp_queue_param_t *param)
+static odp_buffer_hdr_t *error_dequeue(odp_queue_t handle)
{
- uint64_t offset;
- uint32_t queue_size;
- int spsc;
-
- if (name == NULL) {
- queue->s.name[0] = 0;
- } else {
- strncpy(queue->s.name, name, ODP_QUEUE_NAME_LEN - 1);
- queue->s.name[ODP_QUEUE_NAME_LEN - 1] = 0;
- }
- memcpy(&queue->s.param, param, sizeof(odp_queue_param_t));
- if (queue->s.param.sched.lock_count > sched_fn->max_ordered_locks())
- return -1;
-
- if (param->type == ODP_QUEUE_TYPE_SCHED)
- queue->s.param.deq_mode = ODP_QUEUE_OP_DISABLED;
-
- queue->s.type = queue->s.param.type;
+ ODP_ERR("Dequeue not supported (" PRIu64 ")\n",
+ odp_queue_to_u64(handle));
- queue->s.pktin = PKTIN_INVALID;
- queue->s.pktout = PKTOUT_INVALID;
-
- /* Use default size for all small queues to quarantee performance
- * level. */
- queue_size = queue_glb->config.default_queue_size;
- if (param->size > queue_glb->config.default_queue_size)
- queue_size = param->size;
-
- /* Round up if not already a power of two */
- queue_size = ROUNDUP_POWER2_U32(queue_size);
-
- if (queue_size > queue_glb->config.max_queue_size) {
- ODP_ERR("Too large queue size %u\n", queue_size);
- return -1;
- }
-
- offset = queue->s.index * (uint64_t)queue_glb->config.max_queue_size;
-
- /* Single-producer / single-consumer plain queue has simple and
- * lock-free implementation */
- spsc = (param->type == ODP_QUEUE_TYPE_PLAIN) &&
- (param->enq_mode == ODP_QUEUE_OP_MT_UNSAFE) &&
- (param->deq_mode == ODP_QUEUE_OP_MT_UNSAFE);
-
- queue->s.spsc = spsc;
- queue->s.queue_lf = NULL;
-
- if (spsc) {
- queue_spsc_init(queue, queue_size);
- } else {
- queue->s.enqueue = queue_int_enq;
- queue->s.dequeue = queue_int_deq;
- queue->s.enqueue_multi = queue_int_enq_multi;
- queue->s.dequeue_multi = queue_int_deq_multi;
+ return NULL;
+}
- queue->s.orig_dequeue_multi = queue_int_deq_multi;
+static int error_dequeue_multi(odp_queue_t handle,
+ odp_buffer_hdr_t *buf_hdr[], int num)
+{
+ (void)buf_hdr;
+ (void)num;
- ring_st_init(&queue->s.ring_st, &queue_glb->ring_data[offset],
- queue_size);
- }
+ ODP_ERR("Dequeue multi not supported (" PRIu64 ")\n",
+ odp_queue_to_u64(handle));
- return 0;
+ return -1;
}
static void queue_param_init(odp_queue_param_t *params)
@@ -758,6 +693,24 @@ static int queue_info(odp_queue_t handle, odp_queue_info_t *info)
return 0;
}
+static int sched_queue_enq_multi(odp_queue_t handle,
+ odp_buffer_hdr_t *buf_hdr[], int num)
+{
+ return _sched_queue_enq_multi(handle, buf_hdr, num);
+}
+
+static int sched_queue_enq(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr)
+{
+ int ret;
+
+ ret = _sched_queue_enq_multi(handle, &buf_hdr, 1);
+
+ if (ret == 1)
+ return 0;
+ else
+ return -1;
+}
+
int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int max_num,
int update_status)
{
@@ -830,6 +783,87 @@ int sched_queue_empty(uint32_t queue_index)
return ret;
}
+static int queue_init(queue_entry_t *queue, const char *name,
+ const odp_queue_param_t *param)
+{
+ uint64_t offset;
+ uint32_t queue_size;
+ odp_queue_type_t queue_type;
+ int spsc;
+
+ queue_type = param->type;
+
+ if (name == NULL) {
+ queue->s.name[0] = 0;
+ } else {
+ strncpy(queue->s.name, name, ODP_QUEUE_NAME_LEN - 1);
+ queue->s.name[ODP_QUEUE_NAME_LEN - 1] = 0;
+ }
+ memcpy(&queue->s.param, param, sizeof(odp_queue_param_t));
+ if (queue->s.param.sched.lock_count > sched_fn->max_ordered_locks())
+ return -1;
+
+ if (queue_type == ODP_QUEUE_TYPE_SCHED)
+ queue->s.param.deq_mode = ODP_QUEUE_OP_DISABLED;
+
+ queue->s.type = queue_type;
+
+ queue->s.pktin = PKTIN_INVALID;
+ queue->s.pktout = PKTOUT_INVALID;
+
+ /* Use default size for all small queues to quarantee performance
+ * level. */
+ queue_size = queue_glb->config.default_queue_size;
+ if (param->size > queue_glb->config.default_queue_size)
+ queue_size = param->size;
+
+ /* Round up if not already a power of two */
+ queue_size = ROUNDUP_POWER2_U32(queue_size);
+
+ if (queue_size > queue_glb->config.max_queue_size) {
+ ODP_ERR("Too large queue size %u\n", queue_size);
+ return -1;
+ }
+
+ offset = queue->s.index * (uint64_t)queue_glb->config.max_queue_size;
+
+ /* Single-producer / single-consumer plain queue has simple and
+ * lock-free implementation */
+ spsc = (queue_type == ODP_QUEUE_TYPE_PLAIN) &&
+ (param->enq_mode == ODP_QUEUE_OP_MT_UNSAFE) &&
+ (param->deq_mode == ODP_QUEUE_OP_MT_UNSAFE);
+
+ queue->s.spsc = spsc;
+ queue->s.queue_lf = NULL;
+
+ /* Default to error functions */
+ queue->s.enqueue = error_enqueue;
+ queue->s.enqueue_multi = error_enqueue_multi;
+ queue->s.dequeue = error_dequeue;
+ queue->s.dequeue_multi = error_dequeue_multi;
+ queue->s.orig_dequeue_multi = error_dequeue_multi;
+
+ if (spsc) {
+ queue_spsc_init(queue, queue_size);
+ } else {
+ if (queue_type == ODP_QUEUE_TYPE_PLAIN) {
+ queue->s.enqueue = plain_queue_enq;
+ queue->s.enqueue_multi = plain_queue_enq_multi;
+ queue->s.dequeue = plain_queue_deq;
+ queue->s.dequeue_multi = plain_queue_deq_multi;
+ queue->s.orig_dequeue_multi = plain_queue_deq_multi;
+ } else {
+ queue->s.enqueue = sched_queue_enq;
+ queue->s.enqueue_multi = sched_queue_enq_multi;
+ }
+
+ ring_st_init(&queue->s.ring_st, &queue_glb->ring_data[offset],
+ queue_size);
+ }
+
+ return 0;
+}
+
static uint64_t queue_to_u64(odp_queue_t hdl)
{
return _odp_pri(hdl);
@@ -894,6 +928,47 @@ static int queue_orig_multi(odp_queue_t handle,
return queue->s.orig_dequeue_multi(handle, buf_hdr, num);
}
+static int queue_api_enq_multi(odp_queue_t handle,
+ const odp_event_t ev[], int num)
+{
+ queue_entry_t *queue = qentry_from_handle(handle);
+
+ if (odp_unlikely(num == 0))
+ return 0;
+
+ if (num > QUEUE_MULTI_MAX)
+ num = QUEUE_MULTI_MAX;
+
+ return queue->s.enqueue_multi(handle,
+ (odp_buffer_hdr_t **)(uintptr_t)ev, num);
+}
+
+static int queue_api_enq(odp_queue_t handle, odp_event_t ev)
+{
+ queue_entry_t *queue = qentry_from_handle(handle);
+
+ return queue->s.enqueue(handle,
+ (odp_buffer_hdr_t *)(uintptr_t)ev);
+}
+
+static int queue_api_deq_multi(odp_queue_t handle, odp_event_t ev[], int num)
+{
+ queue_entry_t *queue = qentry_from_handle(handle);
+
+ if (num > QUEUE_MULTI_MAX)
+ num = QUEUE_MULTI_MAX;
+
+ return queue->s.dequeue_multi(handle,
+ (odp_buffer_hdr_t **)ev, num);
+}
+
+static odp_event_t queue_api_deq(odp_queue_t handle)
+{
+ queue_entry_t *queue = qentry_from_handle(handle);
+
+ return (odp_event_t)queue->s.dequeue(handle);
+}
+
/* API functions */
_odp_queue_api_fn_t queue_basic_api = {
.queue_create = queue_create,
@@ -901,10 +976,10 @@ _odp_queue_api_fn_t queue_basic_api = {
.queue_lookup = queue_lookup,
.queue_capability = queue_capability,
.queue_context_set = queue_context_set,
- .queue_enq = queue_enq,
- .queue_enq_multi = queue_enq_multi,
- .queue_deq = queue_deq,
- .queue_deq_multi = queue_deq_multi,
+ .queue_enq = queue_api_enq,
+ .queue_enq_multi = queue_api_enq_multi,
+ .queue_deq = queue_api_deq,
+ .queue_deq_multi = queue_api_deq_multi,
.queue_type = queue_type,
.queue_sched_type = queue_sched_type,
.queue_sched_prio = queue_sched_prio,